You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Ian Duffy <ia...@ianduffy.ie> on 2017/04/24 09:38:16 UTC

Stream applications dying on broker ISR change

Hi All,

We're running multiple Kafka Stream applications using Kafka client
0.10.2.0 against a 6 node broker cluster running 0.10.1.1
Additionally, we're running Kafka Connect 0.10.2.0 with the ElasticSearch
connector by confluent [1]

On an ISR change occurring on the brokers, all of the streams applications
and the Kafka connect ES connector threw exceptions and never recovered.

We've seen a correlation between Kafka Broker ISR change and stream
applications dying.

The logs from the streams applications throw out the following and fail to
recover:

07:01:23.323 stream-processor /var/log/application.log  2017-04-24
06:01:23,323 - [WARN] - [1.1.0-6] - [StreamThread-1]
o.a.k.s.p.internals.StreamThread - Unexpected state transition from RUNNING
to NOT_RUNNING
07:01:23.323 stream-processor /var/log/application.log  2017-04-24
06:01:23,324 - [ERROR] - [1.1.0-6] - [StreamThread-1] Application -
Unexpected Exception caught in thread [StreamThread-1]:
org.apache.kafka.streams.errors.StreamsException: Exception caught in
process. taskId=0_81, processor=KSTREAM-SOURCE-0000000000,
topic=kafka-topic, partition=81, offset=479285
at
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:216)
at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:641)
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)
Caused by: org.apache.kafka.streams.errors.StreamsException: task [0_81]
exception caught when producing
at
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.checkForException(RecordCollectorImpl.java:119)
at
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:76)
at
org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:79)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
at
org.apache.kafka.streams.kstream.internals.KStreamFlatMap$KStreamFlatMapProcessor.process(KStreamFlatMap.java:43)
at
org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:48)
at
org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:134)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
at
org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:70)
at
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:197)
... 2 common frames omitted
Caused by: org.apache.kafka.common.errors.NotLeaderForPartitionException:
This server is not the leader for that topic-partition.
07:01:23.558 stream-processor /var/log/application.log  2017-04-24
06:01:23,558 - [WARN] - [1.1.0-6] - [StreamThread-3]
o.a.k.s.p.internals.StreamThread - Unexpected state transition from RUNNING
to NOT_RUNNING
07:01:23.558 stream-processor /var/log/application.log  2017-04-24
06:01:23,559 - [ERROR] - [1.1.0-6] - [StreamThread-3] Application -
Unexpected Exception caught in thread [StreamThread-3]:
org.apache.kafka.streams.errors.StreamsException: Exception caught in
process. taskId=0_55, processor=KSTREAM-SOURCE-0000000000,
topic=kafka-topic, partition=55, offset=479308
at
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:216)
at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:641)
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)
Caused by: org.apache.kafka.streams.errors.StreamsException: task [0_55]
exception caught when producing
at
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.checkForException(RecordCollectorImpl.java:119)
at
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:76)
at
org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:79)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
at
org.apache.kafka.streams.kstream.internals.KStreamFlatMap$KStreamFlatMapProcessor.process(KStreamFlatMap.java:43)
at
org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:48)
at
org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:134)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
at
org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:70)
at
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:197)
... 2 common frames omitted
Caused by: org.apache.kafka.common.errors.NotLeaderForPartitionException:
This server is not the leader for that topic-partition.

Are we potentially doing something wrong with our streams
configuration/usage? Or does this look like a bug?

Thanks,
Ian.

[1] https://github.com/confluentinc/kafka-connect-elasticsearch

Re: Stream applications dying on broker ISR change

Posted by Guozhang Wang <wa...@gmail.com>.
Hello Sachin,

When instance is stopped, it will stop the underlying heart beat thread
during the stopping process so that the coordinator will realize it's
leaving the group.

As for non-graceful stopping, say there is a bug in the stream app code
that cause the thread to die, currently Streams library captures most of
the exceptions, plus we rely on the global error handling for unexpected
exceptions. This is admittedly not ideal, and we are working on finer
grained error handling to fix such issues.

Guozhang

On Mon, Apr 24, 2017 at 7:34 AM, Sachin Mittal <sj...@gmail.com> wrote:

> I had a question about this setting
> ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, Integer.toString(Integer.MAX_
> VALUE)
>
> How would the broker know if a thread has died or say we simply stopped an
> instance and needs to be booted out of the group.
>
> Thanks
> Sachin
>
>
> On Mon, Apr 24, 2017 at 5:55 PM, Eno Thereska <en...@gmail.com>
> wrote:
>
> > Hi Ian,
> >
> >
> > This is now fixed in 0.10.2.1. The default configuration need tweaking.
> If
> > you can't pick that up (it's currently being voted), make sure you have
> > these two parameters set as follows in your streams config:
> >
> > final Properties props = new Properties();
> > ...
> > props.put(ProducerConfig.RETRIES_CONFIG, 10);  <---- increase to 10 from
> > default of 0
> > props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,
> > Integer.toString(Integer.MAX_VALUE)); <--------- increase to infinity
> > from default of 300 s
> >
> > Thanks
> > Eno
> >
> > > On 24 Apr 2017, at 10:38, Ian Duffy <ia...@ianduffy.ie> wrote:
> > >
> > > Hi All,
> > >
> > > We're running multiple Kafka Stream applications using Kafka client
> > > 0.10.2.0 against a 6 node broker cluster running 0.10.1.1
> > > Additionally, we're running Kafka Connect 0.10.2.0 with the
> ElasticSearch
> > > connector by confluent [1]
> > >
> > > On an ISR change occurring on the brokers, all of the streams
> > applications
> > > and the Kafka connect ES connector threw exceptions and never
> recovered.
> > >
> > > We've seen a correlation between Kafka Broker ISR change and stream
> > > applications dying.
> > >
> > > The logs from the streams applications throw out the following and fail
> > to
> > > recover:
> > >
> > > 07:01:23.323 stream-processor /var/log/application.log  2017-04-24
> > > 06:01:23,323 - [WARN] - [1.1.0-6] - [StreamThread-1]
> > > o.a.k.s.p.internals.StreamThread - Unexpected state transition from
> > RUNNING
> > > to NOT_RUNNING
> > > 07:01:23.323 stream-processor /var/log/application.log  2017-04-24
> > > 06:01:23,324 - [ERROR] - [1.1.0-6] - [StreamThread-1] Application -
> > > Unexpected Exception caught in thread [StreamThread-1]:
> > > org.apache.kafka.streams.errors.StreamsException: Exception caught in
> > > process. taskId=0_81, processor=KSTREAM-SOURCE-0000000000,
> > > topic=kafka-topic, partition=81, offset=479285
> > > at
> > > org.apache.kafka.streams.processor.internals.
> > StreamTask.process(StreamTask.java:216)
> > > at
> > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> > StreamThread.java:641)
> > > at
> > > org.apache.kafka.streams.processor.internals.
> > StreamThread.run(StreamThread.java:368)
> > > Caused by: org.apache.kafka.streams.errors.StreamsException: task
> [0_81]
> > > exception caught when producing
> > > at
> > > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.
> > checkForException(RecordCollectorImpl.java:119)
> > > at
> > > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(
> > RecordCollectorImpl.java:76)
> > > at
> > > org.apache.kafka.streams.processor.internals.SinkNode.
> > process(SinkNode.java:79)
> > > at
> > > org.apache.kafka.streams.processor.internals.
> > ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
> > > at
> > > org.apache.kafka.streams.kstream.internals.KStreamFlatMap$
> > KStreamFlatMapProcessor.process(KStreamFlatMap.java:43)
> > > at
> > > org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(
> > ProcessorNode.java:48)
> > > at
> > > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
> > measureLatencyNs(StreamsMetricsImpl.java:188)
> > > at
> > > org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> > ProcessorNode.java:134)
> > > at
> > > org.apache.kafka.streams.processor.internals.
> > ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
> > > at
> > > org.apache.kafka.streams.processor.internals.
> > SourceNode.process(SourceNode.java:70)
> > > at
> > > org.apache.kafka.streams.processor.internals.
> > StreamTask.process(StreamTask.java:197)
> > > ... 2 common frames omitted
> > > Caused by: org.apache.kafka.common.errors.
> NotLeaderForPartitionException
> > :
> > > This server is not the leader for that topic-partition.
> > > 07:01:23.558 stream-processor /var/log/application.log  2017-04-24
> > > 06:01:23,558 - [WARN] - [1.1.0-6] - [StreamThread-3]
> > > o.a.k.s.p.internals.StreamThread - Unexpected state transition from
> > RUNNING
> > > to NOT_RUNNING
> > > 07:01:23.558 stream-processor /var/log/application.log  2017-04-24
> > > 06:01:23,559 - [ERROR] - [1.1.0-6] - [StreamThread-3] Application -
> > > Unexpected Exception caught in thread [StreamThread-3]:
> > > org.apache.kafka.streams.errors.StreamsException: Exception caught in
> > > process. taskId=0_55, processor=KSTREAM-SOURCE-0000000000,
> > > topic=kafka-topic, partition=55, offset=479308
> > > at
> > > org.apache.kafka.streams.processor.internals.
> > StreamTask.process(StreamTask.java:216)
> > > at
> > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> > StreamThread.java:641)
> > > at
> > > org.apache.kafka.streams.processor.internals.
> > StreamThread.run(StreamThread.java:368)
> > > Caused by: org.apache.kafka.streams.errors.StreamsException: task
> [0_55]
> > > exception caught when producing
> > > at
> > > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.
> > checkForException(RecordCollectorImpl.java:119)
> > > at
> > > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(
> > RecordCollectorImpl.java:76)
> > > at
> > > org.apache.kafka.streams.processor.internals.SinkNode.
> > process(SinkNode.java:79)
> > > at
> > > org.apache.kafka.streams.processor.internals.
> > ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
> > > at
> > > org.apache.kafka.streams.kstream.internals.KStreamFlatMap$
> > KStreamFlatMapProcessor.process(KStreamFlatMap.java:43)
> > > at
> > > org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(
> > ProcessorNode.java:48)
> > > at
> > > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
> > measureLatencyNs(StreamsMetricsImpl.java:188)
> > > at
> > > org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> > ProcessorNode.java:134)
> > > at
> > > org.apache.kafka.streams.processor.internals.
> > ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
> > > at
> > > org.apache.kafka.streams.processor.internals.
> > SourceNode.process(SourceNode.java:70)
> > > at
> > > org.apache.kafka.streams.processor.internals.
> > StreamTask.process(StreamTask.java:197)
> > > ... 2 common frames omitted
> > > Caused by: org.apache.kafka.common.errors.
> NotLeaderForPartitionException
> > :
> > > This server is not the leader for that topic-partition.
> > >
> > > Are we potentially doing something wrong with our streams
> > > configuration/usage? Or does this look like a bug?
> > >
> > > Thanks,
> > > Ian.
> > >
> > > [1] https://github.com/confluentinc/kafka-connect-elasticsearch
> >
> >
>



-- 
-- Guozhang

Re: Stream applications dying on broker ISR change

Posted by Ian Duffy <ia...@ianduffy.ie>.
Hi Eno,

Looks like we just didn't wait long enough. It eventually recovered and
started processing again.

Thanks for all the fantastic work in the 0.10.2.1 client.

On 25 April 2017 at 18:12, Eno Thereska <en...@gmail.com> wrote:

> Hi Ian,
>
> Any chance you could share the full log? Feel free to send it to me
> directly if you don't want to broadcast it everywhere.
>
> Thanks
> Eno
>
>
> > On 25 Apr 2017, at 17:36, Ian Duffy <ia...@ianduffy.ie> wrote:
> >
> > Thanks again for the quick response Eno.
> >
> > We just left the application running in the hope it would recover; After
> > ~1hour it's still just continuously spilling out the same exception and
> not
> > managing to continue processing.
> >
> > On 25 April 2017 at 16:24, Eno Thereska <en...@gmail.com> wrote:
> >
> >> Hi Ian,
> >>
> >> Retries are sometimes expected and don't always indicate a problem. We
> >> should probably adjust the printing of the messages to not print this
> >> warning frequently. Are you seeing any crash or does the app proceed?
> >>
> >> Thanks
> >> Eno
> >>
> >> On 25 Apr 2017 4:02 p.m., "Ian Duffy" <ia...@ianduffy.ie> wrote:
> >>
> >> Upgraded a handful of our streams applications to 0.10.2.1 as suggested.
> >> Seeing much less issues and much smoother performance.
> >> They withstood ISR changes.
> >>
> >> Seen the following when more consumers were added to a consumer group:
> >>
> >> 2017-04-25 14:57:37,200 - [WARN] - [1.1.0-11] - [StreamThread-2]
> >> o.a.k.s.p.internals.StreamThread - Could not create task 1_21. Will
> retry.
> >> org.apache.kafka.streams.errors.LockException: task [1_21] Failed to
> lock
> >> the state directory for task 1_21
> >> at
> >> org.apache.kafka.streams.processor.internals.ProcessorStateM
> >> anager.<init>(ProcessorStateManager.java:100)
> >> at
> >> org.apache.kafka.streams.processor.internals.AbstractTask.<
> >> init>(AbstractTask.java:73)
> >> at
> >> org.apache.kafka.streams.processor.internals.StreamTask.<
> >> init>(StreamTask.java:108)
> >> at
> >> org.apache.kafka.streams.processor.internals.StreamThread.
> >> createStreamTask(StreamThread.java:864)
> >> at
> >> org.apache.kafka.streams.processor.internals.StreamThread$
> >> TaskCreator.createTask(StreamThread.java:1237)
> >> at
> >> org.apache.kafka.streams.processor.internals.StreamThread$Ab
> >> stractTaskCreator.retryWithBackoff(StreamThread.java:1210)
> >> at
> >> org.apache.kafka.streams.processor.internals.StreamThread.
> >> addStreamTasks(StreamThread.java:967)
> >> at
> >> org.apache.kafka.streams.processor.internals.StreamThread.
> >> access$600(StreamThread.java:69)
> >> at
> >> org.apache.kafka.streams.processor.internals.StreamThread$1.
> >> onPartitionsAssigned(StreamThread.java:234)
> >> at
> >> org.apache.kafka.clients.consumer.internals.ConsumerCoordina
> >> tor.onJoinComplete(ConsumerCoordinator.java:259)
> >> at
> >> org.apache.kafka.clients.consumer.internals.AbstractCoordina
> >> tor.joinGroupIfNeeded(AbstractCoordinator.java:352)
> >> at
> >> org.apache.kafka.clients.consumer.internals.AbstractCoordina
> >> tor.ensureActiveGroup(AbstractCoordinator.java:303)
> >> at
> >> org.apache.kafka.clients.consumer.internals.ConsumerCoordina
> >> tor.poll(ConsumerCoordinator.java:290)
> >> at
> >> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(
> >> KafkaConsumer.java:1029)
> >> at
> >> org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> >> KafkaConsumer.java:995)
> >> at
> >> org.apache.kafka.streams.processor.internals.StreamThread.
> >> runLoop(StreamThread.java:592)
> >> at
> >> org.apache.kafka.streams.processor.internals.StreamThread.
> >> run(StreamThread.java:361)
> >>
> >>
> >>
> >> On 24 April 2017 at 16:02, Eno Thereska <en...@gmail.com> wrote:
> >>
> >>> Hi Sachin,
> >>>
> >>> In KIP-62 a background heartbeat thread was introduced to deal with the
> >>> group protocol arrivals and departures. There is a setting called
> >>> session.timeout.ms that specifies the timeout of that background
> thread.
> >>> So if the thread has died that background thread will also die and the
> >>> right thing will happen.
> >>>
> >>> Eno
> >>>
> >>>> On 24 Apr 2017, at 15:34, Sachin Mittal <sj...@gmail.com> wrote:
> >>>>
> >>>> I had a question about this setting
> >>>> ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,
> >>> Integer.toString(Integer.MAX_
> >>>> VALUE)
> >>>>
> >>>> How would the broker know if a thread has died or say we simply
> stopped
> >>> an
> >>>> instance and needs to be booted out of the group.
> >>>>
> >>>> Thanks
> >>>> Sachin
> >>>>
> >>>>
> >>>> On Mon, Apr 24, 2017 at 5:55 PM, Eno Thereska <eno.thereska@gmail.com
> >
> >>>> wrote:
> >>>>
> >>>>> Hi Ian,
> >>>>>
> >>>>>
> >>>>> This is now fixed in 0.10.2.1. The default configuration need
> >> tweaking.
> >>> If
> >>>>> you can't pick that up (it's currently being voted), make sure you
> >> have
> >>>>> these two parameters set as follows in your streams config:
> >>>>>
> >>>>> final Properties props = new Properties();
> >>>>> ...
> >>>>> props.put(ProducerConfig.RETRIES_CONFIG, 10);  <---- increase to 10
> >>> from
> >>>>> default of 0
> >>>>> props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,
> >>>>> Integer.toString(Integer.MAX_VALUE)); <--------- increase to
> infinity
> >>>>> from default of 300 s
> >>>>>
> >>>>> Thanks
> >>>>> Eno
> >>>>>
> >>>>>> On 24 Apr 2017, at 10:38, Ian Duffy <ia...@ianduffy.ie> wrote:
> >>>>>>
> >>>>>> Hi All,
> >>>>>>
> >>>>>> We're running multiple Kafka Stream applications using Kafka client
> >>>>>> 0.10.2.0 against a 6 node broker cluster running 0.10.1.1
> >>>>>> Additionally, we're running Kafka Connect 0.10.2.0 with the
> >>> ElasticSearch
> >>>>>> connector by confluent [1]
> >>>>>>
> >>>>>> On an ISR change occurring on the brokers, all of the streams
> >>>>> applications
> >>>>>> and the Kafka connect ES connector threw exceptions and never
> >>> recovered.
> >>>>>>
> >>>>>> We've seen a correlation between Kafka Broker ISR change and stream
> >>>>>> applications dying.
> >>>>>>
> >>>>>> The logs from the streams applications throw out the following and
> >> fail
> >>>>> to
> >>>>>> recover:
> >>>>>>
> >>>>>> 07:01:23.323 stream-processor /var/log/application.log  2017-04-24
> >>>>>> 06:01:23,323 - [WARN] - [1.1.0-6] - [StreamThread-1]
> >>>>>> o.a.k.s.p.internals.StreamThread - Unexpected state transition from
> >>>>> RUNNING
> >>>>>> to NOT_RUNNING
> >>>>>> 07:01:23.323 stream-processor /var/log/application.log  2017-04-24
> >>>>>> 06:01:23,324 - [ERROR] - [1.1.0-6] - [StreamThread-1] Application -
> >>>>>> Unexpected Exception caught in thread [StreamThread-1]:
> >>>>>> org.apache.kafka.streams.errors.StreamsException: Exception caught
> >> in
> >>>>>> process. taskId=0_81, processor=KSTREAM-SOURCE-0000000000,
> >>>>>> topic=kafka-topic, partition=81, offset=479285
> >>>>>> at
> >>>>>> org.apache.kafka.streams.processor.internals.
> >>>>> StreamTask.process(StreamTask.java:216)
> >>>>>> at
> >>>>>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> >>>>> StreamThread.java:641)
> >>>>>> at
> >>>>>> org.apache.kafka.streams.processor.internals.
> >>>>> StreamThread.run(StreamThread.java:368)
> >>>>>> Caused by: org.apache.kafka.streams.errors.StreamsException: task
> >>> [0_81]
> >>>>>> exception caught when producing
> >>>>>> at
> >>>>>> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.
> >>>>> checkForException(RecordCollectorImpl.java:119)
> >>>>>> at
> >>>>>> org.apache.kafka.streams.processor.internals.
> >> RecordCollectorImpl.send(
> >>>>> RecordCollectorImpl.java:76)
> >>>>>> at
> >>>>>> org.apache.kafka.streams.processor.internals.SinkNode.
> >>>>> process(SinkNode.java:79)
> >>>>>> at
> >>>>>> org.apache.kafka.streams.processor.internals.
> >>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
> >>>>>> at
> >>>>>> org.apache.kafka.streams.kstream.internals.KStreamFlatMap$
> >>>>> KStreamFlatMapProcessor.process(KStreamFlatMap.java:43)
> >>>>>> at
> >>>>>> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(
> >>>>> ProcessorNode.java:48)
> >>>>>> at
> >>>>>> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
> >>>>> measureLatencyNs(StreamsMetricsImpl.java:188)
> >>>>>> at
> >>>>>> org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> >>>>> ProcessorNode.java:134)
> >>>>>> at
> >>>>>> org.apache.kafka.streams.processor.internals.
> >>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
> >>>>>> at
> >>>>>> org.apache.kafka.streams.processor.internals.
> >>>>> SourceNode.process(SourceNode.java:70)
> >>>>>> at
> >>>>>> org.apache.kafka.streams.processor.internals.
> >>>>> StreamTask.process(StreamTask.java:197)
> >>>>>> ... 2 common frames omitted
> >>>>>> Caused by: org.apache.kafka.common.errors.
> >>> NotLeaderForPartitionException
> >>>>> :
> >>>>>> This server is not the leader for that topic-partition.
> >>>>>> 07:01:23.558 stream-processor /var/log/application.log  2017-04-24
> >>>>>> 06:01:23,558 - [WARN] - [1.1.0-6] - [StreamThread-3]
> >>>>>> o.a.k.s.p.internals.StreamThread - Unexpected state transition from
> >>>>> RUNNING
> >>>>>> to NOT_RUNNING
> >>>>>> 07:01:23.558 stream-processor /var/log/application.log  2017-04-24
> >>>>>> 06:01:23,559 - [ERROR] - [1.1.0-6] - [StreamThread-3] Application -
> >>>>>> Unexpected Exception caught in thread [StreamThread-3]:
> >>>>>> org.apache.kafka.streams.errors.StreamsException: Exception caught
> >> in
> >>>>>> process. taskId=0_55, processor=KSTREAM-SOURCE-0000000000,
> >>>>>> topic=kafka-topic, partition=55, offset=479308
> >>>>>> at
> >>>>>> org.apache.kafka.streams.processor.internals.
> >>>>> StreamTask.process(StreamTask.java:216)
> >>>>>> at
> >>>>>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> >>>>> StreamThread.java:641)
> >>>>>> at
> >>>>>> org.apache.kafka.streams.processor.internals.
> >>>>> StreamThread.run(StreamThread.java:368)
> >>>>>> Caused by: org.apache.kafka.streams.errors.StreamsException: task
> >>> [0_55]
> >>>>>> exception caught when producing
> >>>>>> at
> >>>>>> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.
> >>>>> checkForException(RecordCollectorImpl.java:119)
> >>>>>> at
> >>>>>> org.apache.kafka.streams.processor.internals.
> >> RecordCollectorImpl.send(
> >>>>> RecordCollectorImpl.java:76)
> >>>>>> at
> >>>>>> org.apache.kafka.streams.processor.internals.SinkNode.
> >>>>> process(SinkNode.java:79)
> >>>>>> at
> >>>>>> org.apache.kafka.streams.processor.internals.
> >>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
> >>>>>> at
> >>>>>> org.apache.kafka.streams.kstream.internals.KStreamFlatMap$
> >>>>> KStreamFlatMapProcessor.process(KStreamFlatMap.java:43)
> >>>>>> at
> >>>>>> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(
> >>>>> ProcessorNode.java:48)
> >>>>>> at
> >>>>>> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
> >>>>> measureLatencyNs(StreamsMetricsImpl.java:188)
> >>>>>> at
> >>>>>> org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> >>>>> ProcessorNode.java:134)
> >>>>>> at
> >>>>>> org.apache.kafka.streams.processor.internals.
> >>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
> >>>>>> at
> >>>>>> org.apache.kafka.streams.processor.internals.
> >>>>> SourceNode.process(SourceNode.java:70)
> >>>>>> at
> >>>>>> org.apache.kafka.streams.processor.internals.
> >>>>> StreamTask.process(StreamTask.java:197)
> >>>>>> ... 2 common frames omitted
> >>>>>> Caused by: org.apache.kafka.common.errors.
> >>> NotLeaderForPartitionException
> >>>>> :
> >>>>>> This server is not the leader for that topic-partition.
> >>>>>>
> >>>>>> Are we potentially doing something wrong with our streams
> >>>>>> configuration/usage? Or does this look like a bug?
> >>>>>>
> >>>>>> Thanks,
> >>>>>> Ian.
> >>>>>>
> >>>>>> [1] https://github.com/confluentinc/kafka-connect-elasticsearch
> >>>>>
> >>>>>
> >>>
> >>>
> >>
>
>

Re: Stream applications dying on broker ISR change

Posted by Eno Thereska <en...@gmail.com>.
Hi Ian,

Any chance you could share the full log? Feel free to send it to me directly if you don't want to broadcast it everywhere.

Thanks
Eno


> On 25 Apr 2017, at 17:36, Ian Duffy <ia...@ianduffy.ie> wrote:
> 
> Thanks again for the quick response Eno.
> 
> We just left the application running in the hope it would recover; After
> ~1hour it's still just continuously spilling out the same exception and not
> managing to continue processing.
> 
> On 25 April 2017 at 16:24, Eno Thereska <en...@gmail.com> wrote:
> 
>> Hi Ian,
>> 
>> Retries are sometimes expected and don't always indicate a problem. We
>> should probably adjust the printing of the messages to not print this
>> warning frequently. Are you seeing any crash or does the app proceed?
>> 
>> Thanks
>> Eno
>> 
>> On 25 Apr 2017 4:02 p.m., "Ian Duffy" <ia...@ianduffy.ie> wrote:
>> 
>> Upgraded a handful of our streams applications to 0.10.2.1 as suggested.
>> Seeing much less issues and much smoother performance.
>> They withstood ISR changes.
>> 
>> Seen the following when more consumers were added to a consumer group:
>> 
>> 2017-04-25 14:57:37,200 - [WARN] - [1.1.0-11] - [StreamThread-2]
>> o.a.k.s.p.internals.StreamThread - Could not create task 1_21. Will retry.
>> org.apache.kafka.streams.errors.LockException: task [1_21] Failed to lock
>> the state directory for task 1_21
>> at
>> org.apache.kafka.streams.processor.internals.ProcessorStateM
>> anager.<init>(ProcessorStateManager.java:100)
>> at
>> org.apache.kafka.streams.processor.internals.AbstractTask.<
>> init>(AbstractTask.java:73)
>> at
>> org.apache.kafka.streams.processor.internals.StreamTask.<
>> init>(StreamTask.java:108)
>> at
>> org.apache.kafka.streams.processor.internals.StreamThread.
>> createStreamTask(StreamThread.java:864)
>> at
>> org.apache.kafka.streams.processor.internals.StreamThread$
>> TaskCreator.createTask(StreamThread.java:1237)
>> at
>> org.apache.kafka.streams.processor.internals.StreamThread$Ab
>> stractTaskCreator.retryWithBackoff(StreamThread.java:1210)
>> at
>> org.apache.kafka.streams.processor.internals.StreamThread.
>> addStreamTasks(StreamThread.java:967)
>> at
>> org.apache.kafka.streams.processor.internals.StreamThread.
>> access$600(StreamThread.java:69)
>> at
>> org.apache.kafka.streams.processor.internals.StreamThread$1.
>> onPartitionsAssigned(StreamThread.java:234)
>> at
>> org.apache.kafka.clients.consumer.internals.ConsumerCoordina
>> tor.onJoinComplete(ConsumerCoordinator.java:259)
>> at
>> org.apache.kafka.clients.consumer.internals.AbstractCoordina
>> tor.joinGroupIfNeeded(AbstractCoordinator.java:352)
>> at
>> org.apache.kafka.clients.consumer.internals.AbstractCoordina
>> tor.ensureActiveGroup(AbstractCoordinator.java:303)
>> at
>> org.apache.kafka.clients.consumer.internals.ConsumerCoordina
>> tor.poll(ConsumerCoordinator.java:290)
>> at
>> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(
>> KafkaConsumer.java:1029)
>> at
>> org.apache.kafka.clients.consumer.KafkaConsumer.poll(
>> KafkaConsumer.java:995)
>> at
>> org.apache.kafka.streams.processor.internals.StreamThread.
>> runLoop(StreamThread.java:592)
>> at
>> org.apache.kafka.streams.processor.internals.StreamThread.
>> run(StreamThread.java:361)
>> 
>> 
>> 
>> On 24 April 2017 at 16:02, Eno Thereska <en...@gmail.com> wrote:
>> 
>>> Hi Sachin,
>>> 
>>> In KIP-62 a background heartbeat thread was introduced to deal with the
>>> group protocol arrivals and departures. There is a setting called
>>> session.timeout.ms that specifies the timeout of that background thread.
>>> So if the thread has died that background thread will also die and the
>>> right thing will happen.
>>> 
>>> Eno
>>> 
>>>> On 24 Apr 2017, at 15:34, Sachin Mittal <sj...@gmail.com> wrote:
>>>> 
>>>> I had a question about this setting
>>>> ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,
>>> Integer.toString(Integer.MAX_
>>>> VALUE)
>>>> 
>>>> How would the broker know if a thread has died or say we simply stopped
>>> an
>>>> instance and needs to be booted out of the group.
>>>> 
>>>> Thanks
>>>> Sachin
>>>> 
>>>> 
>>>> On Mon, Apr 24, 2017 at 5:55 PM, Eno Thereska <en...@gmail.com>
>>>> wrote:
>>>> 
>>>>> Hi Ian,
>>>>> 
>>>>> 
>>>>> This is now fixed in 0.10.2.1. The default configuration need
>> tweaking.
>>> If
>>>>> you can't pick that up (it's currently being voted), make sure you
>> have
>>>>> these two parameters set as follows in your streams config:
>>>>> 
>>>>> final Properties props = new Properties();
>>>>> ...
>>>>> props.put(ProducerConfig.RETRIES_CONFIG, 10);  <---- increase to 10
>>> from
>>>>> default of 0
>>>>> props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,
>>>>> Integer.toString(Integer.MAX_VALUE)); <--------- increase to infinity
>>>>> from default of 300 s
>>>>> 
>>>>> Thanks
>>>>> Eno
>>>>> 
>>>>>> On 24 Apr 2017, at 10:38, Ian Duffy <ia...@ianduffy.ie> wrote:
>>>>>> 
>>>>>> Hi All,
>>>>>> 
>>>>>> We're running multiple Kafka Stream applications using Kafka client
>>>>>> 0.10.2.0 against a 6 node broker cluster running 0.10.1.1
>>>>>> Additionally, we're running Kafka Connect 0.10.2.0 with the
>>> ElasticSearch
>>>>>> connector by confluent [1]
>>>>>> 
>>>>>> On an ISR change occurring on the brokers, all of the streams
>>>>> applications
>>>>>> and the Kafka connect ES connector threw exceptions and never
>>> recovered.
>>>>>> 
>>>>>> We've seen a correlation between Kafka Broker ISR change and stream
>>>>>> applications dying.
>>>>>> 
>>>>>> The logs from the streams applications throw out the following and
>> fail
>>>>> to
>>>>>> recover:
>>>>>> 
>>>>>> 07:01:23.323 stream-processor /var/log/application.log  2017-04-24
>>>>>> 06:01:23,323 - [WARN] - [1.1.0-6] - [StreamThread-1]
>>>>>> o.a.k.s.p.internals.StreamThread - Unexpected state transition from
>>>>> RUNNING
>>>>>> to NOT_RUNNING
>>>>>> 07:01:23.323 stream-processor /var/log/application.log  2017-04-24
>>>>>> 06:01:23,324 - [ERROR] - [1.1.0-6] - [StreamThread-1] Application -
>>>>>> Unexpected Exception caught in thread [StreamThread-1]:
>>>>>> org.apache.kafka.streams.errors.StreamsException: Exception caught
>> in
>>>>>> process. taskId=0_81, processor=KSTREAM-SOURCE-0000000000,
>>>>>> topic=kafka-topic, partition=81, offset=479285
>>>>>> at
>>>>>> org.apache.kafka.streams.processor.internals.
>>>>> StreamTask.process(StreamTask.java:216)
>>>>>> at
>>>>>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
>>>>> StreamThread.java:641)
>>>>>> at
>>>>>> org.apache.kafka.streams.processor.internals.
>>>>> StreamThread.run(StreamThread.java:368)
>>>>>> Caused by: org.apache.kafka.streams.errors.StreamsException: task
>>> [0_81]
>>>>>> exception caught when producing
>>>>>> at
>>>>>> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.
>>>>> checkForException(RecordCollectorImpl.java:119)
>>>>>> at
>>>>>> org.apache.kafka.streams.processor.internals.
>> RecordCollectorImpl.send(
>>>>> RecordCollectorImpl.java:76)
>>>>>> at
>>>>>> org.apache.kafka.streams.processor.internals.SinkNode.
>>>>> process(SinkNode.java:79)
>>>>>> at
>>>>>> org.apache.kafka.streams.processor.internals.
>>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
>>>>>> at
>>>>>> org.apache.kafka.streams.kstream.internals.KStreamFlatMap$
>>>>> KStreamFlatMapProcessor.process(KStreamFlatMap.java:43)
>>>>>> at
>>>>>> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(
>>>>> ProcessorNode.java:48)
>>>>>> at
>>>>>> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
>>>>> measureLatencyNs(StreamsMetricsImpl.java:188)
>>>>>> at
>>>>>> org.apache.kafka.streams.processor.internals.ProcessorNode.process(
>>>>> ProcessorNode.java:134)
>>>>>> at
>>>>>> org.apache.kafka.streams.processor.internals.
>>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
>>>>>> at
>>>>>> org.apache.kafka.streams.processor.internals.
>>>>> SourceNode.process(SourceNode.java:70)
>>>>>> at
>>>>>> org.apache.kafka.streams.processor.internals.
>>>>> StreamTask.process(StreamTask.java:197)
>>>>>> ... 2 common frames omitted
>>>>>> Caused by: org.apache.kafka.common.errors.
>>> NotLeaderForPartitionException
>>>>> :
>>>>>> This server is not the leader for that topic-partition.
>>>>>> 07:01:23.558 stream-processor /var/log/application.log  2017-04-24
>>>>>> 06:01:23,558 - [WARN] - [1.1.0-6] - [StreamThread-3]
>>>>>> o.a.k.s.p.internals.StreamThread - Unexpected state transition from
>>>>> RUNNING
>>>>>> to NOT_RUNNING
>>>>>> 07:01:23.558 stream-processor /var/log/application.log  2017-04-24
>>>>>> 06:01:23,559 - [ERROR] - [1.1.0-6] - [StreamThread-3] Application -
>>>>>> Unexpected Exception caught in thread [StreamThread-3]:
>>>>>> org.apache.kafka.streams.errors.StreamsException: Exception caught
>> in
>>>>>> process. taskId=0_55, processor=KSTREAM-SOURCE-0000000000,
>>>>>> topic=kafka-topic, partition=55, offset=479308
>>>>>> at
>>>>>> org.apache.kafka.streams.processor.internals.
>>>>> StreamTask.process(StreamTask.java:216)
>>>>>> at
>>>>>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
>>>>> StreamThread.java:641)
>>>>>> at
>>>>>> org.apache.kafka.streams.processor.internals.
>>>>> StreamThread.run(StreamThread.java:368)
>>>>>> Caused by: org.apache.kafka.streams.errors.StreamsException: task
>>> [0_55]
>>>>>> exception caught when producing
>>>>>> at
>>>>>> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.
>>>>> checkForException(RecordCollectorImpl.java:119)
>>>>>> at
>>>>>> org.apache.kafka.streams.processor.internals.
>> RecordCollectorImpl.send(
>>>>> RecordCollectorImpl.java:76)
>>>>>> at
>>>>>> org.apache.kafka.streams.processor.internals.SinkNode.
>>>>> process(SinkNode.java:79)
>>>>>> at
>>>>>> org.apache.kafka.streams.processor.internals.
>>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
>>>>>> at
>>>>>> org.apache.kafka.streams.kstream.internals.KStreamFlatMap$
>>>>> KStreamFlatMapProcessor.process(KStreamFlatMap.java:43)
>>>>>> at
>>>>>> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(
>>>>> ProcessorNode.java:48)
>>>>>> at
>>>>>> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
>>>>> measureLatencyNs(StreamsMetricsImpl.java:188)
>>>>>> at
>>>>>> org.apache.kafka.streams.processor.internals.ProcessorNode.process(
>>>>> ProcessorNode.java:134)
>>>>>> at
>>>>>> org.apache.kafka.streams.processor.internals.
>>>>> ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
>>>>>> at
>>>>>> org.apache.kafka.streams.processor.internals.
>>>>> SourceNode.process(SourceNode.java:70)
>>>>>> at
>>>>>> org.apache.kafka.streams.processor.internals.
>>>>> StreamTask.process(StreamTask.java:197)
>>>>>> ... 2 common frames omitted
>>>>>> Caused by: org.apache.kafka.common.errors.
>>> NotLeaderForPartitionException
>>>>> :
>>>>>> This server is not the leader for that topic-partition.
>>>>>> 
>>>>>> Are we potentially doing something wrong with our streams
>>>>>> configuration/usage? Or does this look like a bug?
>>>>>> 
>>>>>> Thanks,
>>>>>> Ian.
>>>>>> 
>>>>>> [1] https://github.com/confluentinc/kafka-connect-elasticsearch
>>>>> 
>>>>> 
>>> 
>>> 
>> 


Re: Stream applications dying on broker ISR change

Posted by Ian Duffy <ia...@ianduffy.ie>.
Thanks again for the quick response Eno.

We just left the application running in the hope it would recover; After
~1hour it's still just continuously spilling out the same exception and not
managing to continue processing.

On 25 April 2017 at 16:24, Eno Thereska <en...@gmail.com> wrote:

> Hi Ian,
>
> Retries are sometimes expected and don't always indicate a problem. We
> should probably adjust the printing of the messages to not print this
> warning frequently. Are you seeing any crash or does the app proceed?
>
> Thanks
> Eno
>
> On 25 Apr 2017 4:02 p.m., "Ian Duffy" <ia...@ianduffy.ie> wrote:
>
> Upgraded a handful of our streams applications to 0.10.2.1 as suggested.
> Seeing much less issues and much smoother performance.
> They withstood ISR changes.
>
> Seen the following when more consumers were added to a consumer group:
>
> 2017-04-25 14:57:37,200 - [WARN] - [1.1.0-11] - [StreamThread-2]
> o.a.k.s.p.internals.StreamThread - Could not create task 1_21. Will retry.
> org.apache.kafka.streams.errors.LockException: task [1_21] Failed to lock
> the state directory for task 1_21
> at
> org.apache.kafka.streams.processor.internals.ProcessorStateM
> anager.<init>(ProcessorStateManager.java:100)
> at
> org.apache.kafka.streams.processor.internals.AbstractTask.<
> init>(AbstractTask.java:73)
> at
> org.apache.kafka.streams.processor.internals.StreamTask.<
> init>(StreamTask.java:108)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.
> createStreamTask(StreamThread.java:864)
> at
> org.apache.kafka.streams.processor.internals.StreamThread$
> TaskCreator.createTask(StreamThread.java:1237)
> at
> org.apache.kafka.streams.processor.internals.StreamThread$Ab
> stractTaskCreator.retryWithBackoff(StreamThread.java:1210)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.
> addStreamTasks(StreamThread.java:967)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.
> access$600(StreamThread.java:69)
> at
> org.apache.kafka.streams.processor.internals.StreamThread$1.
> onPartitionsAssigned(StreamThread.java:234)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordina
> tor.onJoinComplete(ConsumerCoordinator.java:259)
> at
> org.apache.kafka.clients.consumer.internals.AbstractCoordina
> tor.joinGroupIfNeeded(AbstractCoordinator.java:352)
> at
> org.apache.kafka.clients.consumer.internals.AbstractCoordina
> tor.ensureActiveGroup(AbstractCoordinator.java:303)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordina
> tor.poll(ConsumerCoordinator.java:290)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(
> KafkaConsumer.java:1029)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> KafkaConsumer.java:995)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.
> runLoop(StreamThread.java:592)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.
> run(StreamThread.java:361)
>
>
>
> On 24 April 2017 at 16:02, Eno Thereska <en...@gmail.com> wrote:
>
> > Hi Sachin,
> >
> > In KIP-62 a background heartbeat thread was introduced to deal with the
> > group protocol arrivals and departures. There is a setting called
> > session.timeout.ms that specifies the timeout of that background thread.
> > So if the thread has died that background thread will also die and the
> > right thing will happen.
> >
> > Eno
> >
> > > On 24 Apr 2017, at 15:34, Sachin Mittal <sj...@gmail.com> wrote:
> > >
> > > I had a question about this setting
> > > ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,
> > Integer.toString(Integer.MAX_
> > > VALUE)
> > >
> > > How would the broker know if a thread has died or say we simply stopped
> > an
> > > instance and needs to be booted out of the group.
> > >
> > > Thanks
> > > Sachin
> > >
> > >
> > > On Mon, Apr 24, 2017 at 5:55 PM, Eno Thereska <en...@gmail.com>
> > > wrote:
> > >
> > >> Hi Ian,
> > >>
> > >>
> > >> This is now fixed in 0.10.2.1. The default configuration need
> tweaking.
> > If
> > >> you can't pick that up (it's currently being voted), make sure you
> have
> > >> these two parameters set as follows in your streams config:
> > >>
> > >> final Properties props = new Properties();
> > >> ...
> > >> props.put(ProducerConfig.RETRIES_CONFIG, 10);  <---- increase to 10
> > from
> > >> default of 0
> > >> props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,
> > >> Integer.toString(Integer.MAX_VALUE)); <--------- increase to infinity
> > >> from default of 300 s
> > >>
> > >> Thanks
> > >> Eno
> > >>
> > >>> On 24 Apr 2017, at 10:38, Ian Duffy <ia...@ianduffy.ie> wrote:
> > >>>
> > >>> Hi All,
> > >>>
> > >>> We're running multiple Kafka Stream applications using Kafka client
> > >>> 0.10.2.0 against a 6 node broker cluster running 0.10.1.1
> > >>> Additionally, we're running Kafka Connect 0.10.2.0 with the
> > ElasticSearch
> > >>> connector by confluent [1]
> > >>>
> > >>> On an ISR change occurring on the brokers, all of the streams
> > >> applications
> > >>> and the Kafka connect ES connector threw exceptions and never
> > recovered.
> > >>>
> > >>> We've seen a correlation between Kafka Broker ISR change and stream
> > >>> applications dying.
> > >>>
> > >>> The logs from the streams applications throw out the following and
> fail
> > >> to
> > >>> recover:
> > >>>
> > >>> 07:01:23.323 stream-processor /var/log/application.log  2017-04-24
> > >>> 06:01:23,323 - [WARN] - [1.1.0-6] - [StreamThread-1]
> > >>> o.a.k.s.p.internals.StreamThread - Unexpected state transition from
> > >> RUNNING
> > >>> to NOT_RUNNING
> > >>> 07:01:23.323 stream-processor /var/log/application.log  2017-04-24
> > >>> 06:01:23,324 - [ERROR] - [1.1.0-6] - [StreamThread-1] Application -
> > >>> Unexpected Exception caught in thread [StreamThread-1]:
> > >>> org.apache.kafka.streams.errors.StreamsException: Exception caught
> in
> > >>> process. taskId=0_81, processor=KSTREAM-SOURCE-0000000000,
> > >>> topic=kafka-topic, partition=81, offset=479285
> > >>> at
> > >>> org.apache.kafka.streams.processor.internals.
> > >> StreamTask.process(StreamTask.java:216)
> > >>> at
> > >>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> > >> StreamThread.java:641)
> > >>> at
> > >>> org.apache.kafka.streams.processor.internals.
> > >> StreamThread.run(StreamThread.java:368)
> > >>> Caused by: org.apache.kafka.streams.errors.StreamsException: task
> > [0_81]
> > >>> exception caught when producing
> > >>> at
> > >>> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.
> > >> checkForException(RecordCollectorImpl.java:119)
> > >>> at
> > >>> org.apache.kafka.streams.processor.internals.
> RecordCollectorImpl.send(
> > >> RecordCollectorImpl.java:76)
> > >>> at
> > >>> org.apache.kafka.streams.processor.internals.SinkNode.
> > >> process(SinkNode.java:79)
> > >>> at
> > >>> org.apache.kafka.streams.processor.internals.
> > >> ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
> > >>> at
> > >>> org.apache.kafka.streams.kstream.internals.KStreamFlatMap$
> > >> KStreamFlatMapProcessor.process(KStreamFlatMap.java:43)
> > >>> at
> > >>> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(
> > >> ProcessorNode.java:48)
> > >>> at
> > >>> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
> > >> measureLatencyNs(StreamsMetricsImpl.java:188)
> > >>> at
> > >>> org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> > >> ProcessorNode.java:134)
> > >>> at
> > >>> org.apache.kafka.streams.processor.internals.
> > >> ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
> > >>> at
> > >>> org.apache.kafka.streams.processor.internals.
> > >> SourceNode.process(SourceNode.java:70)
> > >>> at
> > >>> org.apache.kafka.streams.processor.internals.
> > >> StreamTask.process(StreamTask.java:197)
> > >>> ... 2 common frames omitted
> > >>> Caused by: org.apache.kafka.common.errors.
> > NotLeaderForPartitionException
> > >> :
> > >>> This server is not the leader for that topic-partition.
> > >>> 07:01:23.558 stream-processor /var/log/application.log  2017-04-24
> > >>> 06:01:23,558 - [WARN] - [1.1.0-6] - [StreamThread-3]
> > >>> o.a.k.s.p.internals.StreamThread - Unexpected state transition from
> > >> RUNNING
> > >>> to NOT_RUNNING
> > >>> 07:01:23.558 stream-processor /var/log/application.log  2017-04-24
> > >>> 06:01:23,559 - [ERROR] - [1.1.0-6] - [StreamThread-3] Application -
> > >>> Unexpected Exception caught in thread [StreamThread-3]:
> > >>> org.apache.kafka.streams.errors.StreamsException: Exception caught
> in
> > >>> process. taskId=0_55, processor=KSTREAM-SOURCE-0000000000,
> > >>> topic=kafka-topic, partition=55, offset=479308
> > >>> at
> > >>> org.apache.kafka.streams.processor.internals.
> > >> StreamTask.process(StreamTask.java:216)
> > >>> at
> > >>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> > >> StreamThread.java:641)
> > >>> at
> > >>> org.apache.kafka.streams.processor.internals.
> > >> StreamThread.run(StreamThread.java:368)
> > >>> Caused by: org.apache.kafka.streams.errors.StreamsException: task
> > [0_55]
> > >>> exception caught when producing
> > >>> at
> > >>> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.
> > >> checkForException(RecordCollectorImpl.java:119)
> > >>> at
> > >>> org.apache.kafka.streams.processor.internals.
> RecordCollectorImpl.send(
> > >> RecordCollectorImpl.java:76)
> > >>> at
> > >>> org.apache.kafka.streams.processor.internals.SinkNode.
> > >> process(SinkNode.java:79)
> > >>> at
> > >>> org.apache.kafka.streams.processor.internals.
> > >> ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
> > >>> at
> > >>> org.apache.kafka.streams.kstream.internals.KStreamFlatMap$
> > >> KStreamFlatMapProcessor.process(KStreamFlatMap.java:43)
> > >>> at
> > >>> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(
> > >> ProcessorNode.java:48)
> > >>> at
> > >>> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
> > >> measureLatencyNs(StreamsMetricsImpl.java:188)
> > >>> at
> > >>> org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> > >> ProcessorNode.java:134)
> > >>> at
> > >>> org.apache.kafka.streams.processor.internals.
> > >> ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
> > >>> at
> > >>> org.apache.kafka.streams.processor.internals.
> > >> SourceNode.process(SourceNode.java:70)
> > >>> at
> > >>> org.apache.kafka.streams.processor.internals.
> > >> StreamTask.process(StreamTask.java:197)
> > >>> ... 2 common frames omitted
> > >>> Caused by: org.apache.kafka.common.errors.
> > NotLeaderForPartitionException
> > >> :
> > >>> This server is not the leader for that topic-partition.
> > >>>
> > >>> Are we potentially doing something wrong with our streams
> > >>> configuration/usage? Or does this look like a bug?
> > >>>
> > >>> Thanks,
> > >>> Ian.
> > >>>
> > >>> [1] https://github.com/confluentinc/kafka-connect-elasticsearch
> > >>
> > >>
> >
> >
>

Re: Stream applications dying on broker ISR change

Posted by Eno Thereska <en...@gmail.com>.
Hi Ian,

Retries are sometimes expected and don't always indicate a problem. We
should probably adjust the printing of the messages to not print this
warning frequently. Are you seeing any crash or does the app proceed?

Thanks
Eno

On 25 Apr 2017 4:02 p.m., "Ian Duffy" <ia...@ianduffy.ie> wrote:

Upgraded a handful of our streams applications to 0.10.2.1 as suggested.
Seeing much less issues and much smoother performance.
They withstood ISR changes.

Seen the following when more consumers were added to a consumer group:

2017-04-25 14:57:37,200 - [WARN] - [1.1.0-11] - [StreamThread-2]
o.a.k.s.p.internals.StreamThread - Could not create task 1_21. Will retry.
org.apache.kafka.streams.errors.LockException: task [1_21] Failed to lock
the state directory for task 1_21
at
org.apache.kafka.streams.processor.internals.ProcessorStateM
anager.<init>(ProcessorStateManager.java:100)
at
org.apache.kafka.streams.processor.internals.AbstractTask.<
init>(AbstractTask.java:73)
at
org.apache.kafka.streams.processor.internals.StreamTask.<
init>(StreamTask.java:108)
at
org.apache.kafka.streams.processor.internals.StreamThread.
createStreamTask(StreamThread.java:864)
at
org.apache.kafka.streams.processor.internals.StreamThread$
TaskCreator.createTask(StreamThread.java:1237)
at
org.apache.kafka.streams.processor.internals.StreamThread$Ab
stractTaskCreator.retryWithBackoff(StreamThread.java:1210)
at
org.apache.kafka.streams.processor.internals.StreamThread.
addStreamTasks(StreamThread.java:967)
at
org.apache.kafka.streams.processor.internals.StreamThread.
access$600(StreamThread.java:69)
at
org.apache.kafka.streams.processor.internals.StreamThread$1.
onPartitionsAssigned(StreamThread.java:234)
at
org.apache.kafka.clients.consumer.internals.ConsumerCoordina
tor.onJoinComplete(ConsumerCoordinator.java:259)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordina
tor.joinGroupIfNeeded(AbstractCoordinator.java:352)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordina
tor.ensureActiveGroup(AbstractCoordinator.java:303)
at
org.apache.kafka.clients.consumer.internals.ConsumerCoordina
tor.poll(ConsumerCoordinator.java:290)
at
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(
KafkaConsumer.java:1029)
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
at
org.apache.kafka.streams.processor.internals.StreamThread.
runLoop(StreamThread.java:592)
at
org.apache.kafka.streams.processor.internals.StreamThread.
run(StreamThread.java:361)



On 24 April 2017 at 16:02, Eno Thereska <en...@gmail.com> wrote:

> Hi Sachin,
>
> In KIP-62 a background heartbeat thread was introduced to deal with the
> group protocol arrivals and departures. There is a setting called
> session.timeout.ms that specifies the timeout of that background thread.
> So if the thread has died that background thread will also die and the
> right thing will happen.
>
> Eno
>
> > On 24 Apr 2017, at 15:34, Sachin Mittal <sj...@gmail.com> wrote:
> >
> > I had a question about this setting
> > ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,
> Integer.toString(Integer.MAX_
> > VALUE)
> >
> > How would the broker know if a thread has died or say we simply stopped
> an
> > instance and needs to be booted out of the group.
> >
> > Thanks
> > Sachin
> >
> >
> > On Mon, Apr 24, 2017 at 5:55 PM, Eno Thereska <en...@gmail.com>
> > wrote:
> >
> >> Hi Ian,
> >>
> >>
> >> This is now fixed in 0.10.2.1. The default configuration need tweaking.
> If
> >> you can't pick that up (it's currently being voted), make sure you have
> >> these two parameters set as follows in your streams config:
> >>
> >> final Properties props = new Properties();
> >> ...
> >> props.put(ProducerConfig.RETRIES_CONFIG, 10);  <---- increase to 10
> from
> >> default of 0
> >> props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,
> >> Integer.toString(Integer.MAX_VALUE)); <--------- increase to infinity
> >> from default of 300 s
> >>
> >> Thanks
> >> Eno
> >>
> >>> On 24 Apr 2017, at 10:38, Ian Duffy <ia...@ianduffy.ie> wrote:
> >>>
> >>> Hi All,
> >>>
> >>> We're running multiple Kafka Stream applications using Kafka client
> >>> 0.10.2.0 against a 6 node broker cluster running 0.10.1.1
> >>> Additionally, we're running Kafka Connect 0.10.2.0 with the
> ElasticSearch
> >>> connector by confluent [1]
> >>>
> >>> On an ISR change occurring on the brokers, all of the streams
> >> applications
> >>> and the Kafka connect ES connector threw exceptions and never
> recovered.
> >>>
> >>> We've seen a correlation between Kafka Broker ISR change and stream
> >>> applications dying.
> >>>
> >>> The logs from the streams applications throw out the following and
fail
> >> to
> >>> recover:
> >>>
> >>> 07:01:23.323 stream-processor /var/log/application.log  2017-04-24
> >>> 06:01:23,323 - [WARN] - [1.1.0-6] - [StreamThread-1]
> >>> o.a.k.s.p.internals.StreamThread - Unexpected state transition from
> >> RUNNING
> >>> to NOT_RUNNING
> >>> 07:01:23.323 stream-processor /var/log/application.log  2017-04-24
> >>> 06:01:23,324 - [ERROR] - [1.1.0-6] - [StreamThread-1] Application -
> >>> Unexpected Exception caught in thread [StreamThread-1]:
> >>> org.apache.kafka.streams.errors.StreamsException: Exception caught in
> >>> process. taskId=0_81, processor=KSTREAM-SOURCE-0000000000,
> >>> topic=kafka-topic, partition=81, offset=479285
> >>> at
> >>> org.apache.kafka.streams.processor.internals.
> >> StreamTask.process(StreamTask.java:216)
> >>> at
> >>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> >> StreamThread.java:641)
> >>> at
> >>> org.apache.kafka.streams.processor.internals.
> >> StreamThread.run(StreamThread.java:368)
> >>> Caused by: org.apache.kafka.streams.errors.StreamsException: task
> [0_81]
> >>> exception caught when producing
> >>> at
> >>> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.
> >> checkForException(RecordCollectorImpl.java:119)
> >>> at
> >>> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(
> >> RecordCollectorImpl.java:76)
> >>> at
> >>> org.apache.kafka.streams.processor.internals.SinkNode.
> >> process(SinkNode.java:79)
> >>> at
> >>> org.apache.kafka.streams.processor.internals.
> >> ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
> >>> at
> >>> org.apache.kafka.streams.kstream.internals.KStreamFlatMap$
> >> KStreamFlatMapProcessor.process(KStreamFlatMap.java:43)
> >>> at
> >>> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(
> >> ProcessorNode.java:48)
> >>> at
> >>> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
> >> measureLatencyNs(StreamsMetricsImpl.java:188)
> >>> at
> >>> org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> >> ProcessorNode.java:134)
> >>> at
> >>> org.apache.kafka.streams.processor.internals.
> >> ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
> >>> at
> >>> org.apache.kafka.streams.processor.internals.
> >> SourceNode.process(SourceNode.java:70)
> >>> at
> >>> org.apache.kafka.streams.processor.internals.
> >> StreamTask.process(StreamTask.java:197)
> >>> ... 2 common frames omitted
> >>> Caused by: org.apache.kafka.common.errors.
> NotLeaderForPartitionException
> >> :
> >>> This server is not the leader for that topic-partition.
> >>> 07:01:23.558 stream-processor /var/log/application.log  2017-04-24
> >>> 06:01:23,558 - [WARN] - [1.1.0-6] - [StreamThread-3]
> >>> o.a.k.s.p.internals.StreamThread - Unexpected state transition from
> >> RUNNING
> >>> to NOT_RUNNING
> >>> 07:01:23.558 stream-processor /var/log/application.log  2017-04-24
> >>> 06:01:23,559 - [ERROR] - [1.1.0-6] - [StreamThread-3] Application -
> >>> Unexpected Exception caught in thread [StreamThread-3]:
> >>> org.apache.kafka.streams.errors.StreamsException: Exception caught in
> >>> process. taskId=0_55, processor=KSTREAM-SOURCE-0000000000,
> >>> topic=kafka-topic, partition=55, offset=479308
> >>> at
> >>> org.apache.kafka.streams.processor.internals.
> >> StreamTask.process(StreamTask.java:216)
> >>> at
> >>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> >> StreamThread.java:641)
> >>> at
> >>> org.apache.kafka.streams.processor.internals.
> >> StreamThread.run(StreamThread.java:368)
> >>> Caused by: org.apache.kafka.streams.errors.StreamsException: task
> [0_55]
> >>> exception caught when producing
> >>> at
> >>> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.
> >> checkForException(RecordCollectorImpl.java:119)
> >>> at
> >>> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(
> >> RecordCollectorImpl.java:76)
> >>> at
> >>> org.apache.kafka.streams.processor.internals.SinkNode.
> >> process(SinkNode.java:79)
> >>> at
> >>> org.apache.kafka.streams.processor.internals.
> >> ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
> >>> at
> >>> org.apache.kafka.streams.kstream.internals.KStreamFlatMap$
> >> KStreamFlatMapProcessor.process(KStreamFlatMap.java:43)
> >>> at
> >>> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(
> >> ProcessorNode.java:48)
> >>> at
> >>> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
> >> measureLatencyNs(StreamsMetricsImpl.java:188)
> >>> at
> >>> org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> >> ProcessorNode.java:134)
> >>> at
> >>> org.apache.kafka.streams.processor.internals.
> >> ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
> >>> at
> >>> org.apache.kafka.streams.processor.internals.
> >> SourceNode.process(SourceNode.java:70)
> >>> at
> >>> org.apache.kafka.streams.processor.internals.
> >> StreamTask.process(StreamTask.java:197)
> >>> ... 2 common frames omitted
> >>> Caused by: org.apache.kafka.common.errors.
> NotLeaderForPartitionException
> >> :
> >>> This server is not the leader for that topic-partition.
> >>>
> >>> Are we potentially doing something wrong with our streams
> >>> configuration/usage? Or does this look like a bug?
> >>>
> >>> Thanks,
> >>> Ian.
> >>>
> >>> [1] https://github.com/confluentinc/kafka-connect-elasticsearch
> >>
> >>
>
>

Re: Stream applications dying on broker ISR change

Posted by Ian Duffy <ia...@ianduffy.ie>.
Upgraded a handful of our streams applications to 0.10.2.1 as suggested.
Seeing much less issues and much smoother performance.
They withstood ISR changes.

Seen the following when more consumers were added to a consumer group:

2017-04-25 14:57:37,200 - [WARN] - [1.1.0-11] - [StreamThread-2]
o.a.k.s.p.internals.StreamThread - Could not create task 1_21. Will retry.
org.apache.kafka.streams.errors.LockException: task [1_21] Failed to lock
the state directory for task 1_21
at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:100)
at
org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:73)
at
org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:108)
at
org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864)
at
org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237)
at
org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
at
org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967)
at
org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69)
at
org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234)
at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290)
at
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029)
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592)
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)



On 24 April 2017 at 16:02, Eno Thereska <en...@gmail.com> wrote:

> Hi Sachin,
>
> In KIP-62 a background heartbeat thread was introduced to deal with the
> group protocol arrivals and departures. There is a setting called
> session.timeout.ms that specifies the timeout of that background thread.
> So if the thread has died that background thread will also die and the
> right thing will happen.
>
> Eno
>
> > On 24 Apr 2017, at 15:34, Sachin Mittal <sj...@gmail.com> wrote:
> >
> > I had a question about this setting
> > ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,
> Integer.toString(Integer.MAX_
> > VALUE)
> >
> > How would the broker know if a thread has died or say we simply stopped
> an
> > instance and needs to be booted out of the group.
> >
> > Thanks
> > Sachin
> >
> >
> > On Mon, Apr 24, 2017 at 5:55 PM, Eno Thereska <en...@gmail.com>
> > wrote:
> >
> >> Hi Ian,
> >>
> >>
> >> This is now fixed in 0.10.2.1. The default configuration need tweaking.
> If
> >> you can't pick that up (it's currently being voted), make sure you have
> >> these two parameters set as follows in your streams config:
> >>
> >> final Properties props = new Properties();
> >> ...
> >> props.put(ProducerConfig.RETRIES_CONFIG, 10);  <---- increase to 10
> from
> >> default of 0
> >> props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,
> >> Integer.toString(Integer.MAX_VALUE)); <--------- increase to infinity
> >> from default of 300 s
> >>
> >> Thanks
> >> Eno
> >>
> >>> On 24 Apr 2017, at 10:38, Ian Duffy <ia...@ianduffy.ie> wrote:
> >>>
> >>> Hi All,
> >>>
> >>> We're running multiple Kafka Stream applications using Kafka client
> >>> 0.10.2.0 against a 6 node broker cluster running 0.10.1.1
> >>> Additionally, we're running Kafka Connect 0.10.2.0 with the
> ElasticSearch
> >>> connector by confluent [1]
> >>>
> >>> On an ISR change occurring on the brokers, all of the streams
> >> applications
> >>> and the Kafka connect ES connector threw exceptions and never
> recovered.
> >>>
> >>> We've seen a correlation between Kafka Broker ISR change and stream
> >>> applications dying.
> >>>
> >>> The logs from the streams applications throw out the following and fail
> >> to
> >>> recover:
> >>>
> >>> 07:01:23.323 stream-processor /var/log/application.log  2017-04-24
> >>> 06:01:23,323 - [WARN] - [1.1.0-6] - [StreamThread-1]
> >>> o.a.k.s.p.internals.StreamThread - Unexpected state transition from
> >> RUNNING
> >>> to NOT_RUNNING
> >>> 07:01:23.323 stream-processor /var/log/application.log  2017-04-24
> >>> 06:01:23,324 - [ERROR] - [1.1.0-6] - [StreamThread-1] Application -
> >>> Unexpected Exception caught in thread [StreamThread-1]:
> >>> org.apache.kafka.streams.errors.StreamsException: Exception caught in
> >>> process. taskId=0_81, processor=KSTREAM-SOURCE-0000000000,
> >>> topic=kafka-topic, partition=81, offset=479285
> >>> at
> >>> org.apache.kafka.streams.processor.internals.
> >> StreamTask.process(StreamTask.java:216)
> >>> at
> >>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> >> StreamThread.java:641)
> >>> at
> >>> org.apache.kafka.streams.processor.internals.
> >> StreamThread.run(StreamThread.java:368)
> >>> Caused by: org.apache.kafka.streams.errors.StreamsException: task
> [0_81]
> >>> exception caught when producing
> >>> at
> >>> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.
> >> checkForException(RecordCollectorImpl.java:119)
> >>> at
> >>> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(
> >> RecordCollectorImpl.java:76)
> >>> at
> >>> org.apache.kafka.streams.processor.internals.SinkNode.
> >> process(SinkNode.java:79)
> >>> at
> >>> org.apache.kafka.streams.processor.internals.
> >> ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
> >>> at
> >>> org.apache.kafka.streams.kstream.internals.KStreamFlatMap$
> >> KStreamFlatMapProcessor.process(KStreamFlatMap.java:43)
> >>> at
> >>> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(
> >> ProcessorNode.java:48)
> >>> at
> >>> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
> >> measureLatencyNs(StreamsMetricsImpl.java:188)
> >>> at
> >>> org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> >> ProcessorNode.java:134)
> >>> at
> >>> org.apache.kafka.streams.processor.internals.
> >> ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
> >>> at
> >>> org.apache.kafka.streams.processor.internals.
> >> SourceNode.process(SourceNode.java:70)
> >>> at
> >>> org.apache.kafka.streams.processor.internals.
> >> StreamTask.process(StreamTask.java:197)
> >>> ... 2 common frames omitted
> >>> Caused by: org.apache.kafka.common.errors.
> NotLeaderForPartitionException
> >> :
> >>> This server is not the leader for that topic-partition.
> >>> 07:01:23.558 stream-processor /var/log/application.log  2017-04-24
> >>> 06:01:23,558 - [WARN] - [1.1.0-6] - [StreamThread-3]
> >>> o.a.k.s.p.internals.StreamThread - Unexpected state transition from
> >> RUNNING
> >>> to NOT_RUNNING
> >>> 07:01:23.558 stream-processor /var/log/application.log  2017-04-24
> >>> 06:01:23,559 - [ERROR] - [1.1.0-6] - [StreamThread-3] Application -
> >>> Unexpected Exception caught in thread [StreamThread-3]:
> >>> org.apache.kafka.streams.errors.StreamsException: Exception caught in
> >>> process. taskId=0_55, processor=KSTREAM-SOURCE-0000000000,
> >>> topic=kafka-topic, partition=55, offset=479308
> >>> at
> >>> org.apache.kafka.streams.processor.internals.
> >> StreamTask.process(StreamTask.java:216)
> >>> at
> >>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> >> StreamThread.java:641)
> >>> at
> >>> org.apache.kafka.streams.processor.internals.
> >> StreamThread.run(StreamThread.java:368)
> >>> Caused by: org.apache.kafka.streams.errors.StreamsException: task
> [0_55]
> >>> exception caught when producing
> >>> at
> >>> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.
> >> checkForException(RecordCollectorImpl.java:119)
> >>> at
> >>> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(
> >> RecordCollectorImpl.java:76)
> >>> at
> >>> org.apache.kafka.streams.processor.internals.SinkNode.
> >> process(SinkNode.java:79)
> >>> at
> >>> org.apache.kafka.streams.processor.internals.
> >> ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
> >>> at
> >>> org.apache.kafka.streams.kstream.internals.KStreamFlatMap$
> >> KStreamFlatMapProcessor.process(KStreamFlatMap.java:43)
> >>> at
> >>> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(
> >> ProcessorNode.java:48)
> >>> at
> >>> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
> >> measureLatencyNs(StreamsMetricsImpl.java:188)
> >>> at
> >>> org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> >> ProcessorNode.java:134)
> >>> at
> >>> org.apache.kafka.streams.processor.internals.
> >> ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
> >>> at
> >>> org.apache.kafka.streams.processor.internals.
> >> SourceNode.process(SourceNode.java:70)
> >>> at
> >>> org.apache.kafka.streams.processor.internals.
> >> StreamTask.process(StreamTask.java:197)
> >>> ... 2 common frames omitted
> >>> Caused by: org.apache.kafka.common.errors.
> NotLeaderForPartitionException
> >> :
> >>> This server is not the leader for that topic-partition.
> >>>
> >>> Are we potentially doing something wrong with our streams
> >>> configuration/usage? Or does this look like a bug?
> >>>
> >>> Thanks,
> >>> Ian.
> >>>
> >>> [1] https://github.com/confluentinc/kafka-connect-elasticsearch
> >>
> >>
>
>

Re: Stream applications dying on broker ISR change

Posted by Eno Thereska <en...@gmail.com>.
Hi Sachin,

In KIP-62 a background heartbeat thread was introduced to deal with the group protocol arrivals and departures. There is a setting called session.timeout.ms that specifies the timeout of that background thread. So if the thread has died that background thread will also die and the right thing will happen.

Eno

> On 24 Apr 2017, at 15:34, Sachin Mittal <sj...@gmail.com> wrote:
> 
> I had a question about this setting
> ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, Integer.toString(Integer.MAX_
> VALUE)
> 
> How would the broker know if a thread has died or say we simply stopped an
> instance and needs to be booted out of the group.
> 
> Thanks
> Sachin
> 
> 
> On Mon, Apr 24, 2017 at 5:55 PM, Eno Thereska <en...@gmail.com>
> wrote:
> 
>> Hi Ian,
>> 
>> 
>> This is now fixed in 0.10.2.1. The default configuration need tweaking. If
>> you can't pick that up (it's currently being voted), make sure you have
>> these two parameters set as follows in your streams config:
>> 
>> final Properties props = new Properties();
>> ...
>> props.put(ProducerConfig.RETRIES_CONFIG, 10);  <---- increase to 10 from
>> default of 0
>> props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,
>> Integer.toString(Integer.MAX_VALUE)); <--------- increase to infinity
>> from default of 300 s
>> 
>> Thanks
>> Eno
>> 
>>> On 24 Apr 2017, at 10:38, Ian Duffy <ia...@ianduffy.ie> wrote:
>>> 
>>> Hi All,
>>> 
>>> We're running multiple Kafka Stream applications using Kafka client
>>> 0.10.2.0 against a 6 node broker cluster running 0.10.1.1
>>> Additionally, we're running Kafka Connect 0.10.2.0 with the ElasticSearch
>>> connector by confluent [1]
>>> 
>>> On an ISR change occurring on the brokers, all of the streams
>> applications
>>> and the Kafka connect ES connector threw exceptions and never recovered.
>>> 
>>> We've seen a correlation between Kafka Broker ISR change and stream
>>> applications dying.
>>> 
>>> The logs from the streams applications throw out the following and fail
>> to
>>> recover:
>>> 
>>> 07:01:23.323 stream-processor /var/log/application.log  2017-04-24
>>> 06:01:23,323 - [WARN] - [1.1.0-6] - [StreamThread-1]
>>> o.a.k.s.p.internals.StreamThread - Unexpected state transition from
>> RUNNING
>>> to NOT_RUNNING
>>> 07:01:23.323 stream-processor /var/log/application.log  2017-04-24
>>> 06:01:23,324 - [ERROR] - [1.1.0-6] - [StreamThread-1] Application -
>>> Unexpected Exception caught in thread [StreamThread-1]:
>>> org.apache.kafka.streams.errors.StreamsException: Exception caught in
>>> process. taskId=0_81, processor=KSTREAM-SOURCE-0000000000,
>>> topic=kafka-topic, partition=81, offset=479285
>>> at
>>> org.apache.kafka.streams.processor.internals.
>> StreamTask.process(StreamTask.java:216)
>>> at
>>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
>> StreamThread.java:641)
>>> at
>>> org.apache.kafka.streams.processor.internals.
>> StreamThread.run(StreamThread.java:368)
>>> Caused by: org.apache.kafka.streams.errors.StreamsException: task [0_81]
>>> exception caught when producing
>>> at
>>> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.
>> checkForException(RecordCollectorImpl.java:119)
>>> at
>>> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(
>> RecordCollectorImpl.java:76)
>>> at
>>> org.apache.kafka.streams.processor.internals.SinkNode.
>> process(SinkNode.java:79)
>>> at
>>> org.apache.kafka.streams.processor.internals.
>> ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
>>> at
>>> org.apache.kafka.streams.kstream.internals.KStreamFlatMap$
>> KStreamFlatMapProcessor.process(KStreamFlatMap.java:43)
>>> at
>>> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(
>> ProcessorNode.java:48)
>>> at
>>> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
>> measureLatencyNs(StreamsMetricsImpl.java:188)
>>> at
>>> org.apache.kafka.streams.processor.internals.ProcessorNode.process(
>> ProcessorNode.java:134)
>>> at
>>> org.apache.kafka.streams.processor.internals.
>> ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
>>> at
>>> org.apache.kafka.streams.processor.internals.
>> SourceNode.process(SourceNode.java:70)
>>> at
>>> org.apache.kafka.streams.processor.internals.
>> StreamTask.process(StreamTask.java:197)
>>> ... 2 common frames omitted
>>> Caused by: org.apache.kafka.common.errors.NotLeaderForPartitionException
>> :
>>> This server is not the leader for that topic-partition.
>>> 07:01:23.558 stream-processor /var/log/application.log  2017-04-24
>>> 06:01:23,558 - [WARN] - [1.1.0-6] - [StreamThread-3]
>>> o.a.k.s.p.internals.StreamThread - Unexpected state transition from
>> RUNNING
>>> to NOT_RUNNING
>>> 07:01:23.558 stream-processor /var/log/application.log  2017-04-24
>>> 06:01:23,559 - [ERROR] - [1.1.0-6] - [StreamThread-3] Application -
>>> Unexpected Exception caught in thread [StreamThread-3]:
>>> org.apache.kafka.streams.errors.StreamsException: Exception caught in
>>> process. taskId=0_55, processor=KSTREAM-SOURCE-0000000000,
>>> topic=kafka-topic, partition=55, offset=479308
>>> at
>>> org.apache.kafka.streams.processor.internals.
>> StreamTask.process(StreamTask.java:216)
>>> at
>>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
>> StreamThread.java:641)
>>> at
>>> org.apache.kafka.streams.processor.internals.
>> StreamThread.run(StreamThread.java:368)
>>> Caused by: org.apache.kafka.streams.errors.StreamsException: task [0_55]
>>> exception caught when producing
>>> at
>>> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.
>> checkForException(RecordCollectorImpl.java:119)
>>> at
>>> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(
>> RecordCollectorImpl.java:76)
>>> at
>>> org.apache.kafka.streams.processor.internals.SinkNode.
>> process(SinkNode.java:79)
>>> at
>>> org.apache.kafka.streams.processor.internals.
>> ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
>>> at
>>> org.apache.kafka.streams.kstream.internals.KStreamFlatMap$
>> KStreamFlatMapProcessor.process(KStreamFlatMap.java:43)
>>> at
>>> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(
>> ProcessorNode.java:48)
>>> at
>>> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
>> measureLatencyNs(StreamsMetricsImpl.java:188)
>>> at
>>> org.apache.kafka.streams.processor.internals.ProcessorNode.process(
>> ProcessorNode.java:134)
>>> at
>>> org.apache.kafka.streams.processor.internals.
>> ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
>>> at
>>> org.apache.kafka.streams.processor.internals.
>> SourceNode.process(SourceNode.java:70)
>>> at
>>> org.apache.kafka.streams.processor.internals.
>> StreamTask.process(StreamTask.java:197)
>>> ... 2 common frames omitted
>>> Caused by: org.apache.kafka.common.errors.NotLeaderForPartitionException
>> :
>>> This server is not the leader for that topic-partition.
>>> 
>>> Are we potentially doing something wrong with our streams
>>> configuration/usage? Or does this look like a bug?
>>> 
>>> Thanks,
>>> Ian.
>>> 
>>> [1] https://github.com/confluentinc/kafka-connect-elasticsearch
>> 
>> 


Re: Stream applications dying on broker ISR change

Posted by Sachin Mittal <sj...@gmail.com>.
I had a question about this setting
ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, Integer.toString(Integer.MAX_
VALUE)

How would the broker know if a thread has died or say we simply stopped an
instance and needs to be booted out of the group.

Thanks
Sachin


On Mon, Apr 24, 2017 at 5:55 PM, Eno Thereska <en...@gmail.com>
wrote:

> Hi Ian,
>
>
> This is now fixed in 0.10.2.1. The default configuration need tweaking. If
> you can't pick that up (it's currently being voted), make sure you have
> these two parameters set as follows in your streams config:
>
> final Properties props = new Properties();
> ...
> props.put(ProducerConfig.RETRIES_CONFIG, 10);  <---- increase to 10 from
> default of 0
> props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,
> Integer.toString(Integer.MAX_VALUE)); <--------- increase to infinity
> from default of 300 s
>
> Thanks
> Eno
>
> > On 24 Apr 2017, at 10:38, Ian Duffy <ia...@ianduffy.ie> wrote:
> >
> > Hi All,
> >
> > We're running multiple Kafka Stream applications using Kafka client
> > 0.10.2.0 against a 6 node broker cluster running 0.10.1.1
> > Additionally, we're running Kafka Connect 0.10.2.0 with the ElasticSearch
> > connector by confluent [1]
> >
> > On an ISR change occurring on the brokers, all of the streams
> applications
> > and the Kafka connect ES connector threw exceptions and never recovered.
> >
> > We've seen a correlation between Kafka Broker ISR change and stream
> > applications dying.
> >
> > The logs from the streams applications throw out the following and fail
> to
> > recover:
> >
> > 07:01:23.323 stream-processor /var/log/application.log  2017-04-24
> > 06:01:23,323 - [WARN] - [1.1.0-6] - [StreamThread-1]
> > o.a.k.s.p.internals.StreamThread - Unexpected state transition from
> RUNNING
> > to NOT_RUNNING
> > 07:01:23.323 stream-processor /var/log/application.log  2017-04-24
> > 06:01:23,324 - [ERROR] - [1.1.0-6] - [StreamThread-1] Application -
> > Unexpected Exception caught in thread [StreamThread-1]:
> > org.apache.kafka.streams.errors.StreamsException: Exception caught in
> > process. taskId=0_81, processor=KSTREAM-SOURCE-0000000000,
> > topic=kafka-topic, partition=81, offset=479285
> > at
> > org.apache.kafka.streams.processor.internals.
> StreamTask.process(StreamTask.java:216)
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> StreamThread.java:641)
> > at
> > org.apache.kafka.streams.processor.internals.
> StreamThread.run(StreamThread.java:368)
> > Caused by: org.apache.kafka.streams.errors.StreamsException: task [0_81]
> > exception caught when producing
> > at
> > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.
> checkForException(RecordCollectorImpl.java:119)
> > at
> > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(
> RecordCollectorImpl.java:76)
> > at
> > org.apache.kafka.streams.processor.internals.SinkNode.
> process(SinkNode.java:79)
> > at
> > org.apache.kafka.streams.processor.internals.
> ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
> > at
> > org.apache.kafka.streams.kstream.internals.KStreamFlatMap$
> KStreamFlatMapProcessor.process(KStreamFlatMap.java:43)
> > at
> > org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(
> ProcessorNode.java:48)
> > at
> > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
> measureLatencyNs(StreamsMetricsImpl.java:188)
> > at
> > org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> ProcessorNode.java:134)
> > at
> > org.apache.kafka.streams.processor.internals.
> ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
> > at
> > org.apache.kafka.streams.processor.internals.
> SourceNode.process(SourceNode.java:70)
> > at
> > org.apache.kafka.streams.processor.internals.
> StreamTask.process(StreamTask.java:197)
> > ... 2 common frames omitted
> > Caused by: org.apache.kafka.common.errors.NotLeaderForPartitionException
> :
> > This server is not the leader for that topic-partition.
> > 07:01:23.558 stream-processor /var/log/application.log  2017-04-24
> > 06:01:23,558 - [WARN] - [1.1.0-6] - [StreamThread-3]
> > o.a.k.s.p.internals.StreamThread - Unexpected state transition from
> RUNNING
> > to NOT_RUNNING
> > 07:01:23.558 stream-processor /var/log/application.log  2017-04-24
> > 06:01:23,559 - [ERROR] - [1.1.0-6] - [StreamThread-3] Application -
> > Unexpected Exception caught in thread [StreamThread-3]:
> > org.apache.kafka.streams.errors.StreamsException: Exception caught in
> > process. taskId=0_55, processor=KSTREAM-SOURCE-0000000000,
> > topic=kafka-topic, partition=55, offset=479308
> > at
> > org.apache.kafka.streams.processor.internals.
> StreamTask.process(StreamTask.java:216)
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> StreamThread.java:641)
> > at
> > org.apache.kafka.streams.processor.internals.
> StreamThread.run(StreamThread.java:368)
> > Caused by: org.apache.kafka.streams.errors.StreamsException: task [0_55]
> > exception caught when producing
> > at
> > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.
> checkForException(RecordCollectorImpl.java:119)
> > at
> > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(
> RecordCollectorImpl.java:76)
> > at
> > org.apache.kafka.streams.processor.internals.SinkNode.
> process(SinkNode.java:79)
> > at
> > org.apache.kafka.streams.processor.internals.
> ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
> > at
> > org.apache.kafka.streams.kstream.internals.KStreamFlatMap$
> KStreamFlatMapProcessor.process(KStreamFlatMap.java:43)
> > at
> > org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(
> ProcessorNode.java:48)
> > at
> > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
> measureLatencyNs(StreamsMetricsImpl.java:188)
> > at
> > org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> ProcessorNode.java:134)
> > at
> > org.apache.kafka.streams.processor.internals.
> ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
> > at
> > org.apache.kafka.streams.processor.internals.
> SourceNode.process(SourceNode.java:70)
> > at
> > org.apache.kafka.streams.processor.internals.
> StreamTask.process(StreamTask.java:197)
> > ... 2 common frames omitted
> > Caused by: org.apache.kafka.common.errors.NotLeaderForPartitionException
> :
> > This server is not the leader for that topic-partition.
> >
> > Are we potentially doing something wrong with our streams
> > configuration/usage? Or does this look like a bug?
> >
> > Thanks,
> > Ian.
> >
> > [1] https://github.com/confluentinc/kafka-connect-elasticsearch
>
>

Re: Stream applications dying on broker ISR change

Posted by Ian Duffy <ia...@ianduffy.ie>.
Awesome! Thank you Eno, I had a look over the release notes awhile back and
was slightly hoping that would be the answer.

Any idea how long it takes for a kafka-connect update to occur after a new
kafka-client is released/passed voting?

On 24 April 2017 at 13:25, Eno Thereska <en...@gmail.com> wrote:

> Hi Ian,
>
>
> This is now fixed in 0.10.2.1. The default configuration need tweaking. If
> you can't pick that up (it's currently being voted), make sure you have
> these two parameters set as follows in your streams config:
>
> final Properties props = new Properties();
> ...
> props.put(ProducerConfig.RETRIES_CONFIG, 10);  <---- increase to 10 from
> default of 0
> props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,
> Integer.toString(Integer.MAX_VALUE)); <--------- increase to infinity
> from default of 300 s
>
> Thanks
> Eno
>
> > On 24 Apr 2017, at 10:38, Ian Duffy <ia...@ianduffy.ie> wrote:
> >
> > Hi All,
> >
> > We're running multiple Kafka Stream applications using Kafka client
> > 0.10.2.0 against a 6 node broker cluster running 0.10.1.1
> > Additionally, we're running Kafka Connect 0.10.2.0 with the ElasticSearch
> > connector by confluent [1]
> >
> > On an ISR change occurring on the brokers, all of the streams
> applications
> > and the Kafka connect ES connector threw exceptions and never recovered.
> >
> > We've seen a correlation between Kafka Broker ISR change and stream
> > applications dying.
> >
> > The logs from the streams applications throw out the following and fail
> to
> > recover:
> >
> > 07:01:23.323 stream-processor /var/log/application.log  2017-04-24
> > 06:01:23,323 - [WARN] - [1.1.0-6] - [StreamThread-1]
> > o.a.k.s.p.internals.StreamThread - Unexpected state transition from
> RUNNING
> > to NOT_RUNNING
> > 07:01:23.323 stream-processor /var/log/application.log  2017-04-24
> > 06:01:23,324 - [ERROR] - [1.1.0-6] - [StreamThread-1] Application -
> > Unexpected Exception caught in thread [StreamThread-1]:
> > org.apache.kafka.streams.errors.StreamsException: Exception caught in
> > process. taskId=0_81, processor=KSTREAM-SOURCE-0000000000,
> > topic=kafka-topic, partition=81, offset=479285
> > at
> > org.apache.kafka.streams.processor.internals.
> StreamTask.process(StreamTask.java:216)
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> StreamThread.java:641)
> > at
> > org.apache.kafka.streams.processor.internals.
> StreamThread.run(StreamThread.java:368)
> > Caused by: org.apache.kafka.streams.errors.StreamsException: task [0_81]
> > exception caught when producing
> > at
> > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.
> checkForException(RecordCollectorImpl.java:119)
> > at
> > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(
> RecordCollectorImpl.java:76)
> > at
> > org.apache.kafka.streams.processor.internals.SinkNode.
> process(SinkNode.java:79)
> > at
> > org.apache.kafka.streams.processor.internals.
> ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
> > at
> > org.apache.kafka.streams.kstream.internals.KStreamFlatMap$
> KStreamFlatMapProcessor.process(KStreamFlatMap.java:43)
> > at
> > org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(
> ProcessorNode.java:48)
> > at
> > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
> measureLatencyNs(StreamsMetricsImpl.java:188)
> > at
> > org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> ProcessorNode.java:134)
> > at
> > org.apache.kafka.streams.processor.internals.
> ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
> > at
> > org.apache.kafka.streams.processor.internals.
> SourceNode.process(SourceNode.java:70)
> > at
> > org.apache.kafka.streams.processor.internals.
> StreamTask.process(StreamTask.java:197)
> > ... 2 common frames omitted
> > Caused by: org.apache.kafka.common.errors.NotLeaderForPartitionException
> :
> > This server is not the leader for that topic-partition.
> > 07:01:23.558 stream-processor /var/log/application.log  2017-04-24
> > 06:01:23,558 - [WARN] - [1.1.0-6] - [StreamThread-3]
> > o.a.k.s.p.internals.StreamThread - Unexpected state transition from
> RUNNING
> > to NOT_RUNNING
> > 07:01:23.558 stream-processor /var/log/application.log  2017-04-24
> > 06:01:23,559 - [ERROR] - [1.1.0-6] - [StreamThread-3] Application -
> > Unexpected Exception caught in thread [StreamThread-3]:
> > org.apache.kafka.streams.errors.StreamsException: Exception caught in
> > process. taskId=0_55, processor=KSTREAM-SOURCE-0000000000,
> > topic=kafka-topic, partition=55, offset=479308
> > at
> > org.apache.kafka.streams.processor.internals.
> StreamTask.process(StreamTask.java:216)
> > at
> > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> StreamThread.java:641)
> > at
> > org.apache.kafka.streams.processor.internals.
> StreamThread.run(StreamThread.java:368)
> > Caused by: org.apache.kafka.streams.errors.StreamsException: task [0_55]
> > exception caught when producing
> > at
> > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.
> checkForException(RecordCollectorImpl.java:119)
> > at
> > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(
> RecordCollectorImpl.java:76)
> > at
> > org.apache.kafka.streams.processor.internals.SinkNode.
> process(SinkNode.java:79)
> > at
> > org.apache.kafka.streams.processor.internals.
> ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
> > at
> > org.apache.kafka.streams.kstream.internals.KStreamFlatMap$
> KStreamFlatMapProcessor.process(KStreamFlatMap.java:43)
> > at
> > org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(
> ProcessorNode.java:48)
> > at
> > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
> measureLatencyNs(StreamsMetricsImpl.java:188)
> > at
> > org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> ProcessorNode.java:134)
> > at
> > org.apache.kafka.streams.processor.internals.
> ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
> > at
> > org.apache.kafka.streams.processor.internals.
> SourceNode.process(SourceNode.java:70)
> > at
> > org.apache.kafka.streams.processor.internals.
> StreamTask.process(StreamTask.java:197)
> > ... 2 common frames omitted
> > Caused by: org.apache.kafka.common.errors.NotLeaderForPartitionException
> :
> > This server is not the leader for that topic-partition.
> >
> > Are we potentially doing something wrong with our streams
> > configuration/usage? Or does this look like a bug?
> >
> > Thanks,
> > Ian.
> >
> > [1] https://github.com/confluentinc/kafka-connect-elasticsearch
>
>

Re: Stream applications dying on broker ISR change

Posted by Eno Thereska <en...@gmail.com>.
Hi Ian,


This is now fixed in 0.10.2.1. The default configuration need tweaking. If you can't pick that up (it's currently being voted), make sure you have these two parameters set as follows in your streams config:

final Properties props = new Properties();
...
props.put(ProducerConfig.RETRIES_CONFIG, 10);  <---- increase to 10 from default of 0
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, Integer.toString(Integer.MAX_VALUE)); <--------- increase to infinity from default of 300 s

Thanks
Eno

> On 24 Apr 2017, at 10:38, Ian Duffy <ia...@ianduffy.ie> wrote:
> 
> Hi All,
> 
> We're running multiple Kafka Stream applications using Kafka client
> 0.10.2.0 against a 6 node broker cluster running 0.10.1.1
> Additionally, we're running Kafka Connect 0.10.2.0 with the ElasticSearch
> connector by confluent [1]
> 
> On an ISR change occurring on the brokers, all of the streams applications
> and the Kafka connect ES connector threw exceptions and never recovered.
> 
> We've seen a correlation between Kafka Broker ISR change and stream
> applications dying.
> 
> The logs from the streams applications throw out the following and fail to
> recover:
> 
> 07:01:23.323 stream-processor /var/log/application.log  2017-04-24
> 06:01:23,323 - [WARN] - [1.1.0-6] - [StreamThread-1]
> o.a.k.s.p.internals.StreamThread - Unexpected state transition from RUNNING
> to NOT_RUNNING
> 07:01:23.323 stream-processor /var/log/application.log  2017-04-24
> 06:01:23,324 - [ERROR] - [1.1.0-6] - [StreamThread-1] Application -
> Unexpected Exception caught in thread [StreamThread-1]:
> org.apache.kafka.streams.errors.StreamsException: Exception caught in
> process. taskId=0_81, processor=KSTREAM-SOURCE-0000000000,
> topic=kafka-topic, partition=81, offset=479285
> at
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:216)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:641)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)
> Caused by: org.apache.kafka.streams.errors.StreamsException: task [0_81]
> exception caught when producing
> at
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.checkForException(RecordCollectorImpl.java:119)
> at
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:76)
> at
> org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:79)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
> at
> org.apache.kafka.streams.kstream.internals.KStreamFlatMap$KStreamFlatMapProcessor.process(KStreamFlatMap.java:43)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:48)
> at
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:134)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
> at
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:70)
> at
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:197)
> ... 2 common frames omitted
> Caused by: org.apache.kafka.common.errors.NotLeaderForPartitionException:
> This server is not the leader for that topic-partition.
> 07:01:23.558 stream-processor /var/log/application.log  2017-04-24
> 06:01:23,558 - [WARN] - [1.1.0-6] - [StreamThread-3]
> o.a.k.s.p.internals.StreamThread - Unexpected state transition from RUNNING
> to NOT_RUNNING
> 07:01:23.558 stream-processor /var/log/application.log  2017-04-24
> 06:01:23,559 - [ERROR] - [1.1.0-6] - [StreamThread-3] Application -
> Unexpected Exception caught in thread [StreamThread-3]:
> org.apache.kafka.streams.errors.StreamsException: Exception caught in
> process. taskId=0_55, processor=KSTREAM-SOURCE-0000000000,
> topic=kafka-topic, partition=55, offset=479308
> at
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:216)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:641)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)
> Caused by: org.apache.kafka.streams.errors.StreamsException: task [0_55]
> exception caught when producing
> at
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.checkForException(RecordCollectorImpl.java:119)
> at
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:76)
> at
> org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:79)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
> at
> org.apache.kafka.streams.kstream.internals.KStreamFlatMap$KStreamFlatMapProcessor.process(KStreamFlatMap.java:43)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:48)
> at
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:134)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
> at
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:70)
> at
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:197)
> ... 2 common frames omitted
> Caused by: org.apache.kafka.common.errors.NotLeaderForPartitionException:
> This server is not the leader for that topic-partition.
> 
> Are we potentially doing something wrong with our streams
> configuration/usage? Or does this look like a bug?
> 
> Thanks,
> Ian.
> 
> [1] https://github.com/confluentinc/kafka-connect-elasticsearch