You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Sachin Mittal <sj...@gmail.com> on 2017/04/01 02:28:32 UTC

Re: Kafka streams 0.10.2 Producer throwing exception eventually causing streams shutdown

Should this timeout be less than max poll interval value? if yes than
generally speaking what should be the ratio between two or range for this
timeout value .

Thanks
Sachin

On 1 Apr 2017 04:57, "Matthias J. Sax" <ma...@confluent.io> wrote:

Yes, you can increase ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG


-Matthias


On 3/31/17 11:32 AM, Sachin Mittal wrote:
> Hi,
> So I have added the config ProducerConfig.RETRIES_CONFIG,
Integer.MAX_VALUE
> and the NotLeaderForPartitionException is gone.
>
> However we see a new exception especially under heavy load:
> org.apache.kafka.streams.errors.StreamsException: task [0_1] exception
> caught when producing
>   at
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.
checkForException(RecordCollectorImpl.java:119)
> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>   at
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(
RecordCollectorImpl.java:127)
> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]        at
> org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.
java:76)
> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>   at
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
measureLatencyNs(StreamsMetricsImpl.java:188)
> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>   at
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.
java:280)
> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]        at
> org.apache.kafka.streams.processor.internals.StreamThread.commitOne(
StreamThread.java:787)
> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>   at
> org.apache.kafka.streams.processor.internals.StreamThread.commitAll(
StreamThread.java:774)
> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]        at
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(
StreamThread.java:749)
> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>   at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
StreamThread.java:671)
> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]        at
> org.apache.kafka.streams.processor.internals.
StreamThread.run(StreamThread.java:378)
> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for
> new-part-advice-key-table-changelog-1: 30001 ms has passed since last
append
>
> So any idea as why TimeoutException is happening.
> Is this controlled by
> ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG
>
> If yes
> What should the value be set in this given that out consumer
> max.poll.interval.ms is defaul 5 minutes.
>
> Is there any other setting that we should try to avoid such errors which
> causes stream thread to die.
>
> Thanks
> Sachin
>
>
> On Sun, Mar 26, 2017 at 1:39 AM, Eno Thereska <en...@gmail.com>
> wrote:
>
>> Hi Sachin,
>>
>> Not in this case.
>>
>> Thanks
>> Eno
>>
>>> On Mar 25, 2017, at 6:19 PM, Sachin Mittal <sj...@gmail.com> wrote:
>>>
>>> OK.
>>> I will try this out.
>>>
>>> Do I need to change anything for
>>> max.in.flight.requests.per.connection
>>>
>>> Thanks
>>> Sachin
>>>
>>>
>>> On Sat, Mar 25, 2017 at 10:59 PM, Eno Thereska <en...@gmail.com>
>>> wrote:
>>>
>>>> Hi Sachin,
>>>>
>>>> For this particular error, “org.apache.kafka.common.errors.
>>>> NotLeaderForPartitionException: This server is not the leader for that
>>>> topic-partition.”, could you try setting the number of retries to
>> something
>>>> large like this:
>>>>
>>>> Properties props = new Properties();
>>>> props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
>>>> ...
>>>> props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
>>>>
>>>> This will retry the produce requests and should hopefully solve your
>>>> immediate problem.
>>>>
>>>> Thanks
>>>> Eno
>>>>
>>>>
>>>> On 25/03/2017, 08:35, "Sachin Mittal" <sj...@gmail.com> wrote:
>>>>
>>>>    Hi,
>>>>    We have encountered another case of series of errors which I would
>> need
>>>>    more help in understanding.
>>>>
>>>>    In logs we see message like this:
>>>>    ERROR 2017-03-25 03:41:40,001 [kafka-producer-network-thread |
>>>>    85-StreamThread-3-producer]:
>>>>    org.apache.kafka.streams.processor.internals.RecordCollectorImpl -
>>>> task
>>>>    [0_1] Error sending record to topic new-part-advice-key-table-
>> changelog.
>>>> No
>>>>    more offsets will be recorded for this task and the exception will
>>>>    eventually be thrown
>>>>
>>>>    then some millisecond later
>>>>    ERROR 2017-03-25 03:41:40,149 [StreamThread-3]:
>>>>    org.apache.kafka.streams.processor.internals.StreamThread -
>>>> stream-thread
>>>>    [StreamThread-3] Failed while executing StreamTask 0_1 due to flush
>>>> state:
>>>>    org.apache.kafka.streams.errors.StreamsException: task [0_1]
>> exception
>>>>    caught when producing
>>>>        at
>>>>    org.apache.kafka.streams.processor.internals.RecordCollectorImpl.
>>>> checkForException(RecordCollectorImpl.java:119)
>>>>    ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>>>        at
>>>>    org.apache.kafka.streams.processor.internals.
>>>> RecordCollectorImpl.flush(RecordCollectorImpl.java:127)
>>>>    ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>>>        at
>>>>    org.apache.kafka.streams.processor.internals.StreamTask.flushState(
>>>> StreamTask.java:422)
>>>>    ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>>>        at
>>>>    org.apache.kafka.streams.processor.internals.StreamThread$4.apply(
>>>> StreamThread.java:555)
>>>>    ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>>>        at
>>>>    org.apache.kafka.streams.processor.internals.StreamThread.
>>>> performOnAllTasks(StreamThread.java:513)
>>>>    [kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>>>        at
>>>>    org.apache.kafka.streams.processor.internals.
>>>> StreamThread.flushAllState(StreamThread.java:551)
>>>>    [kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>>>        at
>>>>    org.apache.kafka.streams.processor.internals.StreamThread.
>>>> shutdownTasksAndState(StreamThread.java:463)
>>>>    [kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>>>        at
>>>>    org.apache.kafka.streams.processor.internals.StreamThread.shutdown(
>>>> StreamThread.java:408)
>>>>    [kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>>>        at
>>>>    org.apache.kafka.streams.processor.internals.
>>>> StreamThread.run(StreamThread.java:389)
>>>>    [kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>>>    org.apache.kafka.common.errors.NotLeaderForPartitionException: This
>>>> server
>>>>    is not the leader for that topic-partition.
>>>>
>>>>    finally we get this
>>>>    ERROR 2017-03-25 03:41:45,724 [StreamThread-3]:
>>>> com.advice.TestKafkaAdvice
>>>>    - Uncaught exception:
>>>>    org.apache.kafka.streams.errors.StreamsException: Exception caught
>> in
>>>>    process. taskId=0_1, processor=KSTREAM-SOURCE-0000000000,
>>>>    topic=advice-stream, partition=1, offset=48062286
>>>>        at
>>>>    org.apache.kafka.streams.processor.internals.
>>>> StreamTask.process(StreamTask.java:216)
>>>>    ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>>>        at
>>>>    org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
>>>> StreamThread.java:651)
>>>>    ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>>>        at
>>>>    org.apache.kafka.streams.processor.internals.
>>>> StreamThread.run(StreamThread.java:378)
>>>>    ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>>>    Caused by: org.apache.kafka.streams.errors.StreamsException: task
>>>> [0_1]
>>>>    exception caught when producing
>>>>        at
>>>>    org.apache.kafka.streams.processor.internals.RecordCollectorImpl.
>>>> checkForException(RecordCollectorImpl.java:119)
>>>>    ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>>>        at
>>>>    org.apache.kafka.streams.processor.internals.
>> RecordCollectorImpl.send(
>>>> RecordCollectorImpl.java:76)
>>>>    ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>>>        at
>>>>    org.apache.kafka.streams.processor.internals.
>> RecordCollectorImpl.send(
>>>> RecordCollectorImpl.java:64)
>>>>    ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>>>
>>>>
>>>>    Again it is not clear why in this case we need to shut down the
>> steams
>>>>    thread and eventually the application. Shouldn't we capture this
>> error
>>>> too?
>>>>
>>>>    Thanks
>>>>    Sachin
>>>>
>>>>
>>>>
>>>>
>>
>>
>

Re: Kafka streams 0.10.2 Producer throwing exception eventually causing streams shutdown

Posted by Mahendra Kariya <ma...@go-jek.com>.
Yeah. Quite possible. Completely missed this possibility. What I simply did
was to download and add the kafka-streams jar as a dependency. I didn't
update the downstream dependencies. My bad!

On Tue, Apr 18, 2017 at 7:42 PM, Eno Thereska <en...@gmail.com>
wrote:

> Hi Mahendra,
>
> I see the java.lang.NoSuchMethodError: org.apache.kafka.clients... error.
> Looks like some jars aren't in the classpath?
>
> Eno
>
> > On 18 Apr 2017, at 12:46, Mahendra Kariya <ma...@go-jek.com>
> wrote:
> >
> > Hey Eno,
> >
> > I just pulled the latest jar from the link you shared and tried to run my
> > code. I am getting the following exception on new KafkaStreams(). The
> same
> > code is working fine with 0.10.2.0 jar.
> >
> >
> > Exception in thread "main" org.apache.kafka.common.KafkaException:
> Failed
> > to construct kafka consumer
> >        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(
> > KafkaConsumer.java:717)
> >        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(
> > KafkaConsumer.java:566)
> >        at org.apache.kafka.streams.processor.internals.
> > DefaultKafkaClientSupplier.getConsumer(DefaultKafkaClientSupplier.
> java:38)
> >        at org.apache.kafka.streams.processor.internals.
> StreamThread.<init>(
> > StreamThread.java:316)
> >        at org.apache.kafka.streams.KafkaStreams.<init>(
> > KafkaStreams.java:358)
> >        at org.apache.kafka.streams.KafkaStreams.<init>(
> > KafkaStreams.java:279)
> > Caused by: java.lang.NoSuchMethodError: org.apache.kafka.clients.
> > Metadata.update(Lorg/apache/kafka/common/Cluster;Ljava/util/Set;J)V
> >        at org.apache.kafka.streams.processor.internals.
> > StreamsKafkaClient.<init>(StreamsKafkaClient.java:98)
> >        at org.apache.kafka.streams.processor.internals.
> > StreamsKafkaClient.<init>(StreamsKafkaClient.java:82)
> >        at org.apache.kafka.streams.processor.internals.
> > StreamPartitionAssignor.configure(StreamPartitionAssignor.java:219)
> >        at org.apache.kafka.common.config.AbstractConfig.
> > getConfiguredInstances(AbstractConfig.java:254)
> >        at org.apache.kafka.common.config.AbstractConfig.
> > getConfiguredInstances(AbstractConfig.java:220)
> >        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(
> > KafkaConsumer.java:673)
> >        ... 6 more
> >
> >
> >
> > On Tue, Apr 18, 2017 at 5:47 AM, Mahendra Kariya <
> mahendra.kariya@go-jek.com
> >> wrote:
> >
> >> Thanks!
> >>
> >> On Tue, Apr 18, 2017, 12:26 AM Eno Thereska <en...@gmail.com>
> >> wrote:
> >>
> >>> The RC candidate build is here: http://home.apache.org/~
> >>> gwenshap/kafka-0.10.2.1-rc1/ <http://home.apache.org/~
> >>> gwenshap/kafka-0.10.2.1-rc1/>
> >>>
> >>> Eno
> >>>> On 17 Apr 2017, at 17:20, Mahendra Kariya <mahendra.kariya@go-jek.com
> >
> >>> wrote:
> >>>>
> >>>> Thanks!
> >>>>
> >>>> In the meantime, is the jar published somewhere on github or as a part
> >>> of
> >>>> build pipeline?
> >>>>
> >>>> On Mon, Apr 17, 2017 at 9:18 PM, Eno Thereska <eno.thereska@gmail.com
> >
> >>>> wrote:
> >>>>
> >>>>> Not yet, but as soon as 0.10.2 is voted it should be. Hopefully this
> >>> week.
> >>>>>
> >>>>> Eno
> >>>>>> On 17 Apr 2017, at 13:25, Mahendra Kariya <
> mahendra.kariya@go-jek.com
> >>>>
> >>>>> wrote:
> >>>>>>
> >>>>>> Are the bug fix releases published to Maven central repo?
> >>>>>>
> >>>>>> On Sat, Apr 1, 2017 at 12:26 PM, Eno Thereska <
> eno.thereska@gmail.com
> >>>>
> >>>>>> wrote:
> >>>>>>
> >>>>>>> Hi Sachin,
> >>>>>>>
> >>>>>>> In the bug fix release for 0.10.2 (and in trunk) we have now set
> >>>>>>> max.poll.interval to infinite since from our experience with
> streams
> >>>>> this
> >>>>>>> should not be something that users set: https://github.com/apache/
> >>>>>>> kafka/pull/2770/files <https://github.com/apache/
> >>> kafka/pull/2770/files
> >>>>>> .
> >>>>>>>
> >>>>>>> We're in the process of documenting that change. For now you can
> >>>>> increase
> >>>>>>> the request timeout without worrying about max.poll.interval
> >>> anymore. In
> >>>>>>> fact I'd suggest you also increase max.poll.interval as we've done
> it
> >>>>> above.
> >>>>>>>
> >>>>>>> Thanks
> >>>>>>> Eno
> >>>>>>>
> >>>>>>>> On 1 Apr 2017, at 03:28, Sachin Mittal <sj...@gmail.com>
> wrote:
> >>>>>>>>
> >>>>>>>> Should this timeout be less than max poll interval value? if yes
> >>> than
> >>>>>>>> generally speaking what should be the ratio between two or range
> for
> >>>>> this
> >>>>>>>> timeout value .
> >>>>>>>>
> >>>>>>>> Thanks
> >>>>>>>> Sachin
> >>>>>>>>
> >>>>>>>> On 1 Apr 2017 04:57, "Matthias J. Sax" <ma...@confluent.io>
> >>> wrote:
> >>>>>>>>
> >>>>>>>> Yes, you can increase ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> -Matthias
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On 3/31/17 11:32 AM, Sachin Mittal wrote:
> >>>>>>>>> Hi,
> >>>>>>>>> So I have added the config ProducerConfig.RETRIES_CONFIG,
> >>>>>>>> Integer.MAX_VALUE
> >>>>>>>>> and the NotLeaderForPartitionException is gone.
> >>>>>>>>>
> >>>>>>>>> However we see a new exception especially under heavy load:
> >>>>>>>>> org.apache.kafka.streams.errors.StreamsException: task [0_1]
> >>>>> exception
> >>>>>>>>> caught when producing
> >>>>>>>>> at
> >>>>>>>>> org.apache.kafka.streams.processor.internals.
> RecordCollectorImpl.
> >>>>>>>> checkForException(RecordCollectorImpl.java:119)
> >>>>>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >>>>>>>>> at
> >>>>>>>>> org.apache.kafka.streams.processor.internals.
> >>>>> RecordCollectorImpl.flush(
> >>>>>>>> RecordCollectorImpl.java:127)
> >>>>>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]        at
> >>>>>>>>> org.apache.kafka.streams.processor.internals.
> >>>>>>> StreamTask$1.run(StreamTask.
> >>>>>>>> java:76)
> >>>>>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >>>>>>>>> at
> >>>>>>>>> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
> >>>>>>>> measureLatencyNs(StreamsMetricsImpl.java:188)
> >>>>>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >>>>>>>>> at
> >>>>>>>>> org.apache.kafka.streams.processor.internals.
> >>>>>>> StreamTask.commit(StreamTask.
> >>>>>>>> java:280)
> >>>>>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]        at
> >>>>>>>>> org.apache.kafka.streams.processor.internals.
> >>> StreamThread.commitOne(
> >>>>>>>> StreamThread.java:787)
> >>>>>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >>>>>>>>> at
> >>>>>>>>> org.apache.kafka.streams.processor.internals.
> >>> StreamThread.commitAll(
> >>>>>>>> StreamThread.java:774)
> >>>>>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]        at
> >>>>>>>>> org.apache.kafka.streams.processor.internals.
> >>>>> StreamThread.maybeCommit(
> >>>>>>>> StreamThread.java:749)
> >>>>>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >>>>>>>>> at
> >>>>>>>>> org.apache.kafka.streams.processor.internals.
> StreamThread.runLoop(
> >>>>>>>> StreamThread.java:671)
> >>>>>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]        at
> >>>>>>>>> org.apache.kafka.streams.processor.internals.
> >>>>>>>> StreamThread.run(StreamThread.java:378)
> >>>>>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >>>>>>>>> org.apache.kafka.common.errors.TimeoutException: Expiring 1
> >>> record(s)
> >>>>>>> for
> >>>>>>>>> new-part-advice-key-table-changelog-1: 30001 ms has passed since
> >>> last
> >>>>>>>> append
> >>>>>>>>>
> >>>>>>>>> So any idea as why TimeoutException is happening.
> >>>>>>>>> Is this controlled by
> >>>>>>>>> ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG
> >>>>>>>>>
> >>>>>>>>> If yes
> >>>>>>>>> What should the value be set in this given that out consumer
> >>>>>>>>> max.poll.interval.ms is defaul 5 minutes.
> >>>>>>>>>
> >>>>>>>>> Is there any other setting that we should try to avoid such
> errors
> >>>>> which
> >>>>>>>>> causes stream thread to die.
> >>>>>>>>>
> >>>>>>>>> Thanks
> >>>>>>>>> Sachin
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On Sun, Mar 26, 2017 at 1:39 AM, Eno Thereska <
> >>> eno.thereska@gmail.com
> >>>>>>
> >>>>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> Hi Sachin,
> >>>>>>>>>>
> >>>>>>>>>> Not in this case.
> >>>>>>>>>>
> >>>>>>>>>> Thanks
> >>>>>>>>>> Eno
> >>>>>>>>>>
> >>>>>>>>>>> On Mar 25, 2017, at 6:19 PM, Sachin Mittal <sjmittal@gmail.com
> >
> >>>>>>> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>> OK.
> >>>>>>>>>>> I will try this out.
> >>>>>>>>>>>
> >>>>>>>>>>> Do I need to change anything for
> >>>>>>>>>>> max.in.flight.requests.per.connection
> >>>>>>>>>>>
> >>>>>>>>>>> Thanks
> >>>>>>>>>>> Sachin
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> On Sat, Mar 25, 2017 at 10:59 PM, Eno Thereska <
> >>>>>>> eno.thereska@gmail.com>
> >>>>>>>>>>> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>>> Hi Sachin,
> >>>>>>>>>>>>
> >>>>>>>>>>>> For this particular error, “org.apache.kafka.common.errors.
> >>>>>>>>>>>> NotLeaderForPartitionException: This server is not the leader
> >>> for
> >>>>>>> that
> >>>>>>>>>>>> topic-partition.”, could you try setting the number of retries
> >>> to
> >>>>>>>>>> something
> >>>>>>>>>>>> large like this:
> >>>>>>>>>>>>
> >>>>>>>>>>>> Properties props = new Properties();
> >>>>>>>>>>>> props.put(StreamsConfig.APPLICATION_ID_CONFIG,
> applicationId);
> >>>>>>>>>>>> ...
> >>>>>>>>>>>> props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
> >>>>>>>>>>>>
> >>>>>>>>>>>> This will retry the produce requests and should hopefully
> solve
> >>>>> your
> >>>>>>>>>>>> immediate problem.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Thanks
> >>>>>>>>>>>> Eno
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> On 25/03/2017, 08:35, "Sachin Mittal" <sj...@gmail.com>
> >>> wrote:
> >>>>>>>>>>>>
> >>>>>>>>>>>> Hi,
> >>>>>>>>>>>> We have encountered another case of series of errors which I
> >>> would
> >>>>>>>>>> need
> >>>>>>>>>>>> more help in understanding.
> >>>>>>>>>>>>
> >>>>>>>>>>>> In logs we see message like this:
> >>>>>>>>>>>> ERROR 2017-03-25 03:41:40,001 [kafka-producer-network-thread |
> >>>>>>>>>>>> 85-StreamThread-3-producer]:
> >>>>>>>>>>>> org.apache.kafka.streams.processor.internals.
> >>> RecordCollectorImpl
> >>>>> -
> >>>>>>>>>>>> task
> >>>>>>>>>>>> [0_1] Error sending record to topic new-part-advice-key-table-
> >>>>>>>>>> changelog.
> >>>>>>>>>>>> No
> >>>>>>>>>>>> more offsets will be recorded for this task and the exception
> >>> will
> >>>>>>>>>>>> eventually be thrown
> >>>>>>>>>>>>
> >>>>>>>>>>>> then some millisecond later
> >>>>>>>>>>>> ERROR 2017-03-25 03:41:40,149 [StreamThread-3]:
> >>>>>>>>>>>> org.apache.kafka.streams.processor.internals.StreamThread -
> >>>>>>>>>>>> stream-thread
> >>>>>>>>>>>> [StreamThread-3] Failed while executing StreamTask 0_1 due to
> >>>>> flush
> >>>>>>>>>>>> state:
> >>>>>>>>>>>> org.apache.kafka.streams.errors.StreamsException: task [0_1]
> >>>>>>>>>> exception
> >>>>>>>>>>>> caught when producing
> >>>>>>>>>>>>    at
> >>>>>>>>>>>> org.apache.kafka.streams.processor.internals.
> >>> RecordCollectorImpl.
> >>>>>>>>>>>> checkForException(RecordCollectorImpl.java:119)
> >>>>>>>>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >>>>>>>>>>>>    at
> >>>>>>>>>>>> org.apache.kafka.streams.processor.internals.
> >>>>>>>>>>>> RecordCollectorImpl.flush(RecordCollectorImpl.java:127)
> >>>>>>>>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >>>>>>>>>>>>    at
> >>>>>>>>>>>> org.apache.kafka.streams.processor.internals.
> >>>>>>> StreamTask.flushState(
> >>>>>>>>>>>> StreamTask.java:422)
> >>>>>>>>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >>>>>>>>>>>>    at
> >>>>>>>>>>>> org.apache.kafka.streams.processor.internals.
> >>>>> StreamThread$4.apply(
> >>>>>>>>>>>> StreamThread.java:555)
> >>>>>>>>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >>>>>>>>>>>>    at
> >>>>>>>>>>>> org.apache.kafka.streams.processor.internals.StreamThread.
> >>>>>>>>>>>> performOnAllTasks(StreamThread.java:513)
> >>>>>>>>>>>> [kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >>>>>>>>>>>>    at
> >>>>>>>>>>>> org.apache.kafka.streams.processor.internals.
> >>>>>>>>>>>> StreamThread.flushAllState(StreamThread.java:551)
> >>>>>>>>>>>> [kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >>>>>>>>>>>>    at
> >>>>>>>>>>>> org.apache.kafka.streams.processor.internals.StreamThread.
> >>>>>>>>>>>> shutdownTasksAndState(StreamThread.java:463)
> >>>>>>>>>>>> [kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >>>>>>>>>>>>    at
> >>>>>>>>>>>> org.apache.kafka.streams.processor.internals.
> >>>>>>> StreamThread.shutdown(
> >>>>>>>>>>>> StreamThread.java:408)
> >>>>>>>>>>>> [kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >>>>>>>>>>>>    at
> >>>>>>>>>>>> org.apache.kafka.streams.processor.internals.
> >>>>>>>>>>>> StreamThread.run(StreamThread.java:389)
> >>>>>>>>>>>> [kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >>>>>>>>>>>> org.apache.kafka.common.errors.NotLeaderForPartitionException
> :
> >>>>>>> This
> >>>>>>>>>>>> server
> >>>>>>>>>>>> is not the leader for that topic-partition.
> >>>>>>>>>>>>
> >>>>>>>>>>>> finally we get this
> >>>>>>>>>>>> ERROR 2017-03-25 03:41:45,724 [StreamThread-3]:
> >>>>>>>>>>>> com.advice.TestKafkaAdvice
> >>>>>>>>>>>> - Uncaught exception:
> >>>>>>>>>>>> org.apache.kafka.streams.errors.StreamsException: Exception
> >>>>> caught
> >>>>>>>>>> in
> >>>>>>>>>>>> process. taskId=0_1, processor=KSTREAM-SOURCE-0000000000,
> >>>>>>>>>>>> topic=advice-stream, partition=1, offset=48062286
> >>>>>>>>>>>>    at
> >>>>>>>>>>>> org.apache.kafka.streams.processor.internals.
> >>>>>>>>>>>> StreamTask.process(StreamTask.java:216)
> >>>>>>>>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >>>>>>>>>>>>    at
> >>>>>>>>>>>> org.apache.kafka.streams.processor.internals.
> >>>>> StreamThread.runLoop(
> >>>>>>>>>>>> StreamThread.java:651)
> >>>>>>>>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >>>>>>>>>>>>    at
> >>>>>>>>>>>> org.apache.kafka.streams.processor.internals.
> >>>>>>>>>>>> StreamThread.run(StreamThread.java:378)
> >>>>>>>>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >>>>>>>>>>>> Caused by: org.apache.kafka.streams.errors.StreamsException:
> >>> task
> >>>>>>>>>>>> [0_1]
> >>>>>>>>>>>> exception caught when producing
> >>>>>>>>>>>>    at
> >>>>>>>>>>>> org.apache.kafka.streams.processor.internals.
> >>> RecordCollectorImpl.
> >>>>>>>>>>>> checkForException(RecordCollectorImpl.java:119)
> >>>>>>>>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >>>>>>>>>>>>    at
> >>>>>>>>>>>> org.apache.kafka.streams.processor.internals.
> >>>>>>>>>> RecordCollectorImpl.send(
> >>>>>>>>>>>> RecordCollectorImpl.java:76)
> >>>>>>>>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >>>>>>>>>>>>    at
> >>>>>>>>>>>> org.apache.kafka.streams.processor.internals.
> >>>>>>>>>> RecordCollectorImpl.send(
> >>>>>>>>>>>> RecordCollectorImpl.java:64)
> >>>>>>>>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> Again it is not clear why in this case we need to shut down
> the
> >>>>>>>>>> steams
> >>>>>>>>>>>> thread and eventually the application. Shouldn't we capture
> this
> >>>>>>>>>> error
> >>>>>>>>>>>> too?
> >>>>>>>>>>>>
> >>>>>>>>>>>> Thanks
> >>>>>>>>>>>> Sachin
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>
> >>>>>
> >>>
> >>>
>
>

Re: Kafka streams 0.10.2 Producer throwing exception eventually causing streams shutdown

Posted by Eno Thereska <en...@gmail.com>.
Hi Mahendra,

I see the java.lang.NoSuchMethodError: org.apache.kafka.clients... error. Looks like some jars aren't in the classpath?

Eno

> On 18 Apr 2017, at 12:46, Mahendra Kariya <ma...@go-jek.com> wrote:
> 
> Hey Eno,
> 
> I just pulled the latest jar from the link you shared and tried to run my
> code. I am getting the following exception on new KafkaStreams(). The same
> code is working fine with 0.10.2.0 jar.
> 
> 
> Exception in thread "main" org.apache.kafka.common.KafkaException: Failed
> to construct kafka consumer
>        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(
> KafkaConsumer.java:717)
>        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(
> KafkaConsumer.java:566)
>        at org.apache.kafka.streams.processor.internals.
> DefaultKafkaClientSupplier.getConsumer(DefaultKafkaClientSupplier.java:38)
>        at org.apache.kafka.streams.processor.internals.StreamThread.<init>(
> StreamThread.java:316)
>        at org.apache.kafka.streams.KafkaStreams.<init>(
> KafkaStreams.java:358)
>        at org.apache.kafka.streams.KafkaStreams.<init>(
> KafkaStreams.java:279)
> Caused by: java.lang.NoSuchMethodError: org.apache.kafka.clients.
> Metadata.update(Lorg/apache/kafka/common/Cluster;Ljava/util/Set;J)V
>        at org.apache.kafka.streams.processor.internals.
> StreamsKafkaClient.<init>(StreamsKafkaClient.java:98)
>        at org.apache.kafka.streams.processor.internals.
> StreamsKafkaClient.<init>(StreamsKafkaClient.java:82)
>        at org.apache.kafka.streams.processor.internals.
> StreamPartitionAssignor.configure(StreamPartitionAssignor.java:219)
>        at org.apache.kafka.common.config.AbstractConfig.
> getConfiguredInstances(AbstractConfig.java:254)
>        at org.apache.kafka.common.config.AbstractConfig.
> getConfiguredInstances(AbstractConfig.java:220)
>        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(
> KafkaConsumer.java:673)
>        ... 6 more
> 
> 
> 
> On Tue, Apr 18, 2017 at 5:47 AM, Mahendra Kariya <mahendra.kariya@go-jek.com
>> wrote:
> 
>> Thanks!
>> 
>> On Tue, Apr 18, 2017, 12:26 AM Eno Thereska <en...@gmail.com>
>> wrote:
>> 
>>> The RC candidate build is here: http://home.apache.org/~
>>> gwenshap/kafka-0.10.2.1-rc1/ <http://home.apache.org/~
>>> gwenshap/kafka-0.10.2.1-rc1/>
>>> 
>>> Eno
>>>> On 17 Apr 2017, at 17:20, Mahendra Kariya <ma...@go-jek.com>
>>> wrote:
>>>> 
>>>> Thanks!
>>>> 
>>>> In the meantime, is the jar published somewhere on github or as a part
>>> of
>>>> build pipeline?
>>>> 
>>>> On Mon, Apr 17, 2017 at 9:18 PM, Eno Thereska <en...@gmail.com>
>>>> wrote:
>>>> 
>>>>> Not yet, but as soon as 0.10.2 is voted it should be. Hopefully this
>>> week.
>>>>> 
>>>>> Eno
>>>>>> On 17 Apr 2017, at 13:25, Mahendra Kariya <mahendra.kariya@go-jek.com
>>>> 
>>>>> wrote:
>>>>>> 
>>>>>> Are the bug fix releases published to Maven central repo?
>>>>>> 
>>>>>> On Sat, Apr 1, 2017 at 12:26 PM, Eno Thereska <eno.thereska@gmail.com
>>>> 
>>>>>> wrote:
>>>>>> 
>>>>>>> Hi Sachin,
>>>>>>> 
>>>>>>> In the bug fix release for 0.10.2 (and in trunk) we have now set
>>>>>>> max.poll.interval to infinite since from our experience with streams
>>>>> this
>>>>>>> should not be something that users set: https://github.com/apache/
>>>>>>> kafka/pull/2770/files <https://github.com/apache/
>>> kafka/pull/2770/files
>>>>>> .
>>>>>>> 
>>>>>>> We're in the process of documenting that change. For now you can
>>>>> increase
>>>>>>> the request timeout without worrying about max.poll.interval
>>> anymore. In
>>>>>>> fact I'd suggest you also increase max.poll.interval as we've done it
>>>>> above.
>>>>>>> 
>>>>>>> Thanks
>>>>>>> Eno
>>>>>>> 
>>>>>>>> On 1 Apr 2017, at 03:28, Sachin Mittal <sj...@gmail.com> wrote:
>>>>>>>> 
>>>>>>>> Should this timeout be less than max poll interval value? if yes
>>> than
>>>>>>>> generally speaking what should be the ratio between two or range for
>>>>> this
>>>>>>>> timeout value .
>>>>>>>> 
>>>>>>>> Thanks
>>>>>>>> Sachin
>>>>>>>> 
>>>>>>>> On 1 Apr 2017 04:57, "Matthias J. Sax" <ma...@confluent.io>
>>> wrote:
>>>>>>>> 
>>>>>>>> Yes, you can increase ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG
>>>>>>>> 
>>>>>>>> 
>>>>>>>> -Matthias
>>>>>>>> 
>>>>>>>> 
>>>>>>>> On 3/31/17 11:32 AM, Sachin Mittal wrote:
>>>>>>>>> Hi,
>>>>>>>>> So I have added the config ProducerConfig.RETRIES_CONFIG,
>>>>>>>> Integer.MAX_VALUE
>>>>>>>>> and the NotLeaderForPartitionException is gone.
>>>>>>>>> 
>>>>>>>>> However we see a new exception especially under heavy load:
>>>>>>>>> org.apache.kafka.streams.errors.StreamsException: task [0_1]
>>>>> exception
>>>>>>>>> caught when producing
>>>>>>>>> at
>>>>>>>>> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.
>>>>>>>> checkForException(RecordCollectorImpl.java:119)
>>>>>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>>>>>>>> at
>>>>>>>>> org.apache.kafka.streams.processor.internals.
>>>>> RecordCollectorImpl.flush(
>>>>>>>> RecordCollectorImpl.java:127)
>>>>>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]        at
>>>>>>>>> org.apache.kafka.streams.processor.internals.
>>>>>>> StreamTask$1.run(StreamTask.
>>>>>>>> java:76)
>>>>>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>>>>>>>> at
>>>>>>>>> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
>>>>>>>> measureLatencyNs(StreamsMetricsImpl.java:188)
>>>>>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>>>>>>>> at
>>>>>>>>> org.apache.kafka.streams.processor.internals.
>>>>>>> StreamTask.commit(StreamTask.
>>>>>>>> java:280)
>>>>>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]        at
>>>>>>>>> org.apache.kafka.streams.processor.internals.
>>> StreamThread.commitOne(
>>>>>>>> StreamThread.java:787)
>>>>>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>>>>>>>> at
>>>>>>>>> org.apache.kafka.streams.processor.internals.
>>> StreamThread.commitAll(
>>>>>>>> StreamThread.java:774)
>>>>>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]        at
>>>>>>>>> org.apache.kafka.streams.processor.internals.
>>>>> StreamThread.maybeCommit(
>>>>>>>> StreamThread.java:749)
>>>>>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>>>>>>>> at
>>>>>>>>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
>>>>>>>> StreamThread.java:671)
>>>>>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]        at
>>>>>>>>> org.apache.kafka.streams.processor.internals.
>>>>>>>> StreamThread.run(StreamThread.java:378)
>>>>>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>>>>>>>> org.apache.kafka.common.errors.TimeoutException: Expiring 1
>>> record(s)
>>>>>>> for
>>>>>>>>> new-part-advice-key-table-changelog-1: 30001 ms has passed since
>>> last
>>>>>>>> append
>>>>>>>>> 
>>>>>>>>> So any idea as why TimeoutException is happening.
>>>>>>>>> Is this controlled by
>>>>>>>>> ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG
>>>>>>>>> 
>>>>>>>>> If yes
>>>>>>>>> What should the value be set in this given that out consumer
>>>>>>>>> max.poll.interval.ms is defaul 5 minutes.
>>>>>>>>> 
>>>>>>>>> Is there any other setting that we should try to avoid such errors
>>>>> which
>>>>>>>>> causes stream thread to die.
>>>>>>>>> 
>>>>>>>>> Thanks
>>>>>>>>> Sachin
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> On Sun, Mar 26, 2017 at 1:39 AM, Eno Thereska <
>>> eno.thereska@gmail.com
>>>>>> 
>>>>>>>>> wrote:
>>>>>>>>> 
>>>>>>>>>> Hi Sachin,
>>>>>>>>>> 
>>>>>>>>>> Not in this case.
>>>>>>>>>> 
>>>>>>>>>> Thanks
>>>>>>>>>> Eno
>>>>>>>>>> 
>>>>>>>>>>> On Mar 25, 2017, at 6:19 PM, Sachin Mittal <sj...@gmail.com>
>>>>>>> wrote:
>>>>>>>>>>> 
>>>>>>>>>>> OK.
>>>>>>>>>>> I will try this out.
>>>>>>>>>>> 
>>>>>>>>>>> Do I need to change anything for
>>>>>>>>>>> max.in.flight.requests.per.connection
>>>>>>>>>>> 
>>>>>>>>>>> Thanks
>>>>>>>>>>> Sachin
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> On Sat, Mar 25, 2017 at 10:59 PM, Eno Thereska <
>>>>>>> eno.thereska@gmail.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>> 
>>>>>>>>>>>> Hi Sachin,
>>>>>>>>>>>> 
>>>>>>>>>>>> For this particular error, “org.apache.kafka.common.errors.
>>>>>>>>>>>> NotLeaderForPartitionException: This server is not the leader
>>> for
>>>>>>> that
>>>>>>>>>>>> topic-partition.”, could you try setting the number of retries
>>> to
>>>>>>>>>> something
>>>>>>>>>>>> large like this:
>>>>>>>>>>>> 
>>>>>>>>>>>> Properties props = new Properties();
>>>>>>>>>>>> props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
>>>>>>>>>>>> ...
>>>>>>>>>>>> props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
>>>>>>>>>>>> 
>>>>>>>>>>>> This will retry the produce requests and should hopefully solve
>>>>> your
>>>>>>>>>>>> immediate problem.
>>>>>>>>>>>> 
>>>>>>>>>>>> Thanks
>>>>>>>>>>>> Eno
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> On 25/03/2017, 08:35, "Sachin Mittal" <sj...@gmail.com>
>>> wrote:
>>>>>>>>>>>> 
>>>>>>>>>>>> Hi,
>>>>>>>>>>>> We have encountered another case of series of errors which I
>>> would
>>>>>>>>>> need
>>>>>>>>>>>> more help in understanding.
>>>>>>>>>>>> 
>>>>>>>>>>>> In logs we see message like this:
>>>>>>>>>>>> ERROR 2017-03-25 03:41:40,001 [kafka-producer-network-thread |
>>>>>>>>>>>> 85-StreamThread-3-producer]:
>>>>>>>>>>>> org.apache.kafka.streams.processor.internals.
>>> RecordCollectorImpl
>>>>> -
>>>>>>>>>>>> task
>>>>>>>>>>>> [0_1] Error sending record to topic new-part-advice-key-table-
>>>>>>>>>> changelog.
>>>>>>>>>>>> No
>>>>>>>>>>>> more offsets will be recorded for this task and the exception
>>> will
>>>>>>>>>>>> eventually be thrown
>>>>>>>>>>>> 
>>>>>>>>>>>> then some millisecond later
>>>>>>>>>>>> ERROR 2017-03-25 03:41:40,149 [StreamThread-3]:
>>>>>>>>>>>> org.apache.kafka.streams.processor.internals.StreamThread -
>>>>>>>>>>>> stream-thread
>>>>>>>>>>>> [StreamThread-3] Failed while executing StreamTask 0_1 due to
>>>>> flush
>>>>>>>>>>>> state:
>>>>>>>>>>>> org.apache.kafka.streams.errors.StreamsException: task [0_1]
>>>>>>>>>> exception
>>>>>>>>>>>> caught when producing
>>>>>>>>>>>>    at
>>>>>>>>>>>> org.apache.kafka.streams.processor.internals.
>>> RecordCollectorImpl.
>>>>>>>>>>>> checkForException(RecordCollectorImpl.java:119)
>>>>>>>>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>>>>>>>>>>>    at
>>>>>>>>>>>> org.apache.kafka.streams.processor.internals.
>>>>>>>>>>>> RecordCollectorImpl.flush(RecordCollectorImpl.java:127)
>>>>>>>>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>>>>>>>>>>>    at
>>>>>>>>>>>> org.apache.kafka.streams.processor.internals.
>>>>>>> StreamTask.flushState(
>>>>>>>>>>>> StreamTask.java:422)
>>>>>>>>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>>>>>>>>>>>    at
>>>>>>>>>>>> org.apache.kafka.streams.processor.internals.
>>>>> StreamThread$4.apply(
>>>>>>>>>>>> StreamThread.java:555)
>>>>>>>>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>>>>>>>>>>>    at
>>>>>>>>>>>> org.apache.kafka.streams.processor.internals.StreamThread.
>>>>>>>>>>>> performOnAllTasks(StreamThread.java:513)
>>>>>>>>>>>> [kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>>>>>>>>>>>    at
>>>>>>>>>>>> org.apache.kafka.streams.processor.internals.
>>>>>>>>>>>> StreamThread.flushAllState(StreamThread.java:551)
>>>>>>>>>>>> [kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>>>>>>>>>>>    at
>>>>>>>>>>>> org.apache.kafka.streams.processor.internals.StreamThread.
>>>>>>>>>>>> shutdownTasksAndState(StreamThread.java:463)
>>>>>>>>>>>> [kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>>>>>>>>>>>    at
>>>>>>>>>>>> org.apache.kafka.streams.processor.internals.
>>>>>>> StreamThread.shutdown(
>>>>>>>>>>>> StreamThread.java:408)
>>>>>>>>>>>> [kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>>>>>>>>>>>    at
>>>>>>>>>>>> org.apache.kafka.streams.processor.internals.
>>>>>>>>>>>> StreamThread.run(StreamThread.java:389)
>>>>>>>>>>>> [kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>>>>>>>>>>> org.apache.kafka.common.errors.NotLeaderForPartitionException:
>>>>>>> This
>>>>>>>>>>>> server
>>>>>>>>>>>> is not the leader for that topic-partition.
>>>>>>>>>>>> 
>>>>>>>>>>>> finally we get this
>>>>>>>>>>>> ERROR 2017-03-25 03:41:45,724 [StreamThread-3]:
>>>>>>>>>>>> com.advice.TestKafkaAdvice
>>>>>>>>>>>> - Uncaught exception:
>>>>>>>>>>>> org.apache.kafka.streams.errors.StreamsException: Exception
>>>>> caught
>>>>>>>>>> in
>>>>>>>>>>>> process. taskId=0_1, processor=KSTREAM-SOURCE-0000000000,
>>>>>>>>>>>> topic=advice-stream, partition=1, offset=48062286
>>>>>>>>>>>>    at
>>>>>>>>>>>> org.apache.kafka.streams.processor.internals.
>>>>>>>>>>>> StreamTask.process(StreamTask.java:216)
>>>>>>>>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>>>>>>>>>>>    at
>>>>>>>>>>>> org.apache.kafka.streams.processor.internals.
>>>>> StreamThread.runLoop(
>>>>>>>>>>>> StreamThread.java:651)
>>>>>>>>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>>>>>>>>>>>    at
>>>>>>>>>>>> org.apache.kafka.streams.processor.internals.
>>>>>>>>>>>> StreamThread.run(StreamThread.java:378)
>>>>>>>>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>>>>>>>>>>> Caused by: org.apache.kafka.streams.errors.StreamsException:
>>> task
>>>>>>>>>>>> [0_1]
>>>>>>>>>>>> exception caught when producing
>>>>>>>>>>>>    at
>>>>>>>>>>>> org.apache.kafka.streams.processor.internals.
>>> RecordCollectorImpl.
>>>>>>>>>>>> checkForException(RecordCollectorImpl.java:119)
>>>>>>>>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>>>>>>>>>>>    at
>>>>>>>>>>>> org.apache.kafka.streams.processor.internals.
>>>>>>>>>> RecordCollectorImpl.send(
>>>>>>>>>>>> RecordCollectorImpl.java:76)
>>>>>>>>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>>>>>>>>>>>    at
>>>>>>>>>>>> org.apache.kafka.streams.processor.internals.
>>>>>>>>>> RecordCollectorImpl.send(
>>>>>>>>>>>> RecordCollectorImpl.java:64)
>>>>>>>>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> Again it is not clear why in this case we need to shut down the
>>>>>>>>>> steams
>>>>>>>>>>>> thread and eventually the application. Shouldn't we capture this
>>>>>>>>>> error
>>>>>>>>>>>> too?
>>>>>>>>>>>> 
>>>>>>>>>>>> Thanks
>>>>>>>>>>>> Sachin
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>> 
>>>>> 
>>> 
>>> 


Re: Kafka streams 0.10.2 Producer throwing exception eventually causing streams shutdown

Posted by Mahendra Kariya <ma...@go-jek.com>.
Hey Eno,

I just pulled the latest jar from the link you shared and tried to run my
code. I am getting the following exception on new KafkaStreams(). The same
code is working fine with 0.10.2.0 jar.


Exception in thread "main" org.apache.kafka.common.KafkaException: Failed
to construct kafka consumer
        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(
KafkaConsumer.java:717)
        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(
KafkaConsumer.java:566)
        at org.apache.kafka.streams.processor.internals.
DefaultKafkaClientSupplier.getConsumer(DefaultKafkaClientSupplier.java:38)
        at org.apache.kafka.streams.processor.internals.StreamThread.<init>(
StreamThread.java:316)
        at org.apache.kafka.streams.KafkaStreams.<init>(
KafkaStreams.java:358)
        at org.apache.kafka.streams.KafkaStreams.<init>(
KafkaStreams.java:279)
Caused by: java.lang.NoSuchMethodError: org.apache.kafka.clients.
Metadata.update(Lorg/apache/kafka/common/Cluster;Ljava/util/Set;J)V
        at org.apache.kafka.streams.processor.internals.
StreamsKafkaClient.<init>(StreamsKafkaClient.java:98)
        at org.apache.kafka.streams.processor.internals.
StreamsKafkaClient.<init>(StreamsKafkaClient.java:82)
        at org.apache.kafka.streams.processor.internals.
StreamPartitionAssignor.configure(StreamPartitionAssignor.java:219)
        at org.apache.kafka.common.config.AbstractConfig.
getConfiguredInstances(AbstractConfig.java:254)
        at org.apache.kafka.common.config.AbstractConfig.
getConfiguredInstances(AbstractConfig.java:220)
        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(
KafkaConsumer.java:673)
        ... 6 more



On Tue, Apr 18, 2017 at 5:47 AM, Mahendra Kariya <mahendra.kariya@go-jek.com
> wrote:

> Thanks!
>
> On Tue, Apr 18, 2017, 12:26 AM Eno Thereska <en...@gmail.com>
> wrote:
>
>> The RC candidate build is here: http://home.apache.org/~
>> gwenshap/kafka-0.10.2.1-rc1/ <http://home.apache.org/~
>> gwenshap/kafka-0.10.2.1-rc1/>
>>
>> Eno
>> > On 17 Apr 2017, at 17:20, Mahendra Kariya <ma...@go-jek.com>
>> wrote:
>> >
>> > Thanks!
>> >
>> > In the meantime, is the jar published somewhere on github or as a part
>> of
>> > build pipeline?
>> >
>> > On Mon, Apr 17, 2017 at 9:18 PM, Eno Thereska <en...@gmail.com>
>> > wrote:
>> >
>> >> Not yet, but as soon as 0.10.2 is voted it should be. Hopefully this
>> week.
>> >>
>> >> Eno
>> >>> On 17 Apr 2017, at 13:25, Mahendra Kariya <mahendra.kariya@go-jek.com
>> >
>> >> wrote:
>> >>>
>> >>> Are the bug fix releases published to Maven central repo?
>> >>>
>> >>> On Sat, Apr 1, 2017 at 12:26 PM, Eno Thereska <eno.thereska@gmail.com
>> >
>> >>> wrote:
>> >>>
>> >>>> Hi Sachin,
>> >>>>
>> >>>> In the bug fix release for 0.10.2 (and in trunk) we have now set
>> >>>> max.poll.interval to infinite since from our experience with streams
>> >> this
>> >>>> should not be something that users set: https://github.com/apache/
>> >>>> kafka/pull/2770/files <https://github.com/apache/
>> kafka/pull/2770/files
>> >>> .
>> >>>>
>> >>>> We're in the process of documenting that change. For now you can
>> >> increase
>> >>>> the request timeout without worrying about max.poll.interval
>> anymore. In
>> >>>> fact I'd suggest you also increase max.poll.interval as we've done it
>> >> above.
>> >>>>
>> >>>> Thanks
>> >>>> Eno
>> >>>>
>> >>>>> On 1 Apr 2017, at 03:28, Sachin Mittal <sj...@gmail.com> wrote:
>> >>>>>
>> >>>>> Should this timeout be less than max poll interval value? if yes
>> than
>> >>>>> generally speaking what should be the ratio between two or range for
>> >> this
>> >>>>> timeout value .
>> >>>>>
>> >>>>> Thanks
>> >>>>> Sachin
>> >>>>>
>> >>>>> On 1 Apr 2017 04:57, "Matthias J. Sax" <ma...@confluent.io>
>> wrote:
>> >>>>>
>> >>>>> Yes, you can increase ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG
>> >>>>>
>> >>>>>
>> >>>>> -Matthias
>> >>>>>
>> >>>>>
>> >>>>> On 3/31/17 11:32 AM, Sachin Mittal wrote:
>> >>>>>> Hi,
>> >>>>>> So I have added the config ProducerConfig.RETRIES_CONFIG,
>> >>>>> Integer.MAX_VALUE
>> >>>>>> and the NotLeaderForPartitionException is gone.
>> >>>>>>
>> >>>>>> However we see a new exception especially under heavy load:
>> >>>>>> org.apache.kafka.streams.errors.StreamsException: task [0_1]
>> >> exception
>> >>>>>> caught when producing
>> >>>>>> at
>> >>>>>> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.
>> >>>>> checkForException(RecordCollectorImpl.java:119)
>> >>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>> >>>>>> at
>> >>>>>> org.apache.kafka.streams.processor.internals.
>> >> RecordCollectorImpl.flush(
>> >>>>> RecordCollectorImpl.java:127)
>> >>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]        at
>> >>>>>> org.apache.kafka.streams.processor.internals.
>> >>>> StreamTask$1.run(StreamTask.
>> >>>>> java:76)
>> >>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>> >>>>>> at
>> >>>>>> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
>> >>>>> measureLatencyNs(StreamsMetricsImpl.java:188)
>> >>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>> >>>>>> at
>> >>>>>> org.apache.kafka.streams.processor.internals.
>> >>>> StreamTask.commit(StreamTask.
>> >>>>> java:280)
>> >>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]        at
>> >>>>>> org.apache.kafka.streams.processor.internals.
>> StreamThread.commitOne(
>> >>>>> StreamThread.java:787)
>> >>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>> >>>>>> at
>> >>>>>> org.apache.kafka.streams.processor.internals.
>> StreamThread.commitAll(
>> >>>>> StreamThread.java:774)
>> >>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]        at
>> >>>>>> org.apache.kafka.streams.processor.internals.
>> >> StreamThread.maybeCommit(
>> >>>>> StreamThread.java:749)
>> >>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>> >>>>>> at
>> >>>>>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
>> >>>>> StreamThread.java:671)
>> >>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]        at
>> >>>>>> org.apache.kafka.streams.processor.internals.
>> >>>>> StreamThread.run(StreamThread.java:378)
>> >>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>> >>>>>> org.apache.kafka.common.errors.TimeoutException: Expiring 1
>> record(s)
>> >>>> for
>> >>>>>> new-part-advice-key-table-changelog-1: 30001 ms has passed since
>> last
>> >>>>> append
>> >>>>>>
>> >>>>>> So any idea as why TimeoutException is happening.
>> >>>>>> Is this controlled by
>> >>>>>> ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG
>> >>>>>>
>> >>>>>> If yes
>> >>>>>> What should the value be set in this given that out consumer
>> >>>>>> max.poll.interval.ms is defaul 5 minutes.
>> >>>>>>
>> >>>>>> Is there any other setting that we should try to avoid such errors
>> >> which
>> >>>>>> causes stream thread to die.
>> >>>>>>
>> >>>>>> Thanks
>> >>>>>> Sachin
>> >>>>>>
>> >>>>>>
>> >>>>>> On Sun, Mar 26, 2017 at 1:39 AM, Eno Thereska <
>> eno.thereska@gmail.com
>> >>>
>> >>>>>> wrote:
>> >>>>>>
>> >>>>>>> Hi Sachin,
>> >>>>>>>
>> >>>>>>> Not in this case.
>> >>>>>>>
>> >>>>>>> Thanks
>> >>>>>>> Eno
>> >>>>>>>
>> >>>>>>>> On Mar 25, 2017, at 6:19 PM, Sachin Mittal <sj...@gmail.com>
>> >>>> wrote:
>> >>>>>>>>
>> >>>>>>>> OK.
>> >>>>>>>> I will try this out.
>> >>>>>>>>
>> >>>>>>>> Do I need to change anything for
>> >>>>>>>> max.in.flight.requests.per.connection
>> >>>>>>>>
>> >>>>>>>> Thanks
>> >>>>>>>> Sachin
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>> On Sat, Mar 25, 2017 at 10:59 PM, Eno Thereska <
>> >>>> eno.thereska@gmail.com>
>> >>>>>>>> wrote:
>> >>>>>>>>
>> >>>>>>>>> Hi Sachin,
>> >>>>>>>>>
>> >>>>>>>>> For this particular error, “org.apache.kafka.common.errors.
>> >>>>>>>>> NotLeaderForPartitionException: This server is not the leader
>> for
>> >>>> that
>> >>>>>>>>> topic-partition.”, could you try setting the number of retries
>> to
>> >>>>>>> something
>> >>>>>>>>> large like this:
>> >>>>>>>>>
>> >>>>>>>>> Properties props = new Properties();
>> >>>>>>>>> props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
>> >>>>>>>>> ...
>> >>>>>>>>> props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
>> >>>>>>>>>
>> >>>>>>>>> This will retry the produce requests and should hopefully solve
>> >> your
>> >>>>>>>>> immediate problem.
>> >>>>>>>>>
>> >>>>>>>>> Thanks
>> >>>>>>>>> Eno
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>> On 25/03/2017, 08:35, "Sachin Mittal" <sj...@gmail.com>
>> wrote:
>> >>>>>>>>>
>> >>>>>>>>> Hi,
>> >>>>>>>>> We have encountered another case of series of errors which I
>> would
>> >>>>>>> need
>> >>>>>>>>> more help in understanding.
>> >>>>>>>>>
>> >>>>>>>>> In logs we see message like this:
>> >>>>>>>>> ERROR 2017-03-25 03:41:40,001 [kafka-producer-network-thread |
>> >>>>>>>>> 85-StreamThread-3-producer]:
>> >>>>>>>>> org.apache.kafka.streams.processor.internals.
>> RecordCollectorImpl
>> >> -
>> >>>>>>>>> task
>> >>>>>>>>> [0_1] Error sending record to topic new-part-advice-key-table-
>> >>>>>>> changelog.
>> >>>>>>>>> No
>> >>>>>>>>> more offsets will be recorded for this task and the exception
>> will
>> >>>>>>>>> eventually be thrown
>> >>>>>>>>>
>> >>>>>>>>> then some millisecond later
>> >>>>>>>>> ERROR 2017-03-25 03:41:40,149 [StreamThread-3]:
>> >>>>>>>>> org.apache.kafka.streams.processor.internals.StreamThread -
>> >>>>>>>>> stream-thread
>> >>>>>>>>> [StreamThread-3] Failed while executing StreamTask 0_1 due to
>> >> flush
>> >>>>>>>>> state:
>> >>>>>>>>> org.apache.kafka.streams.errors.StreamsException: task [0_1]
>> >>>>>>> exception
>> >>>>>>>>> caught when producing
>> >>>>>>>>>     at
>> >>>>>>>>> org.apache.kafka.streams.processor.internals.
>> RecordCollectorImpl.
>> >>>>>>>>> checkForException(RecordCollectorImpl.java:119)
>> >>>>>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>> >>>>>>>>>     at
>> >>>>>>>>> org.apache.kafka.streams.processor.internals.
>> >>>>>>>>> RecordCollectorImpl.flush(RecordCollectorImpl.java:127)
>> >>>>>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>> >>>>>>>>>     at
>> >>>>>>>>> org.apache.kafka.streams.processor.internals.
>> >>>> StreamTask.flushState(
>> >>>>>>>>> StreamTask.java:422)
>> >>>>>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>> >>>>>>>>>     at
>> >>>>>>>>> org.apache.kafka.streams.processor.internals.
>> >> StreamThread$4.apply(
>> >>>>>>>>> StreamThread.java:555)
>> >>>>>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>> >>>>>>>>>     at
>> >>>>>>>>> org.apache.kafka.streams.processor.internals.StreamThread.
>> >>>>>>>>> performOnAllTasks(StreamThread.java:513)
>> >>>>>>>>> [kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>> >>>>>>>>>     at
>> >>>>>>>>> org.apache.kafka.streams.processor.internals.
>> >>>>>>>>> StreamThread.flushAllState(StreamThread.java:551)
>> >>>>>>>>> [kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>> >>>>>>>>>     at
>> >>>>>>>>> org.apache.kafka.streams.processor.internals.StreamThread.
>> >>>>>>>>> shutdownTasksAndState(StreamThread.java:463)
>> >>>>>>>>> [kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>> >>>>>>>>>     at
>> >>>>>>>>> org.apache.kafka.streams.processor.internals.
>> >>>> StreamThread.shutdown(
>> >>>>>>>>> StreamThread.java:408)
>> >>>>>>>>> [kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>> >>>>>>>>>     at
>> >>>>>>>>> org.apache.kafka.streams.processor.internals.
>> >>>>>>>>> StreamThread.run(StreamThread.java:389)
>> >>>>>>>>> [kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>> >>>>>>>>> org.apache.kafka.common.errors.NotLeaderForPartitionException:
>> >>>> This
>> >>>>>>>>> server
>> >>>>>>>>> is not the leader for that topic-partition.
>> >>>>>>>>>
>> >>>>>>>>> finally we get this
>> >>>>>>>>> ERROR 2017-03-25 03:41:45,724 [StreamThread-3]:
>> >>>>>>>>> com.advice.TestKafkaAdvice
>> >>>>>>>>> - Uncaught exception:
>> >>>>>>>>> org.apache.kafka.streams.errors.StreamsException: Exception
>> >> caught
>> >>>>>>> in
>> >>>>>>>>> process. taskId=0_1, processor=KSTREAM-SOURCE-0000000000,
>> >>>>>>>>> topic=advice-stream, partition=1, offset=48062286
>> >>>>>>>>>     at
>> >>>>>>>>> org.apache.kafka.streams.processor.internals.
>> >>>>>>>>> StreamTask.process(StreamTask.java:216)
>> >>>>>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>> >>>>>>>>>     at
>> >>>>>>>>> org.apache.kafka.streams.processor.internals.
>> >> StreamThread.runLoop(
>> >>>>>>>>> StreamThread.java:651)
>> >>>>>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>> >>>>>>>>>     at
>> >>>>>>>>> org.apache.kafka.streams.processor.internals.
>> >>>>>>>>> StreamThread.run(StreamThread.java:378)
>> >>>>>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>> >>>>>>>>> Caused by: org.apache.kafka.streams.errors.StreamsException:
>> task
>> >>>>>>>>> [0_1]
>> >>>>>>>>> exception caught when producing
>> >>>>>>>>>     at
>> >>>>>>>>> org.apache.kafka.streams.processor.internals.
>> RecordCollectorImpl.
>> >>>>>>>>> checkForException(RecordCollectorImpl.java:119)
>> >>>>>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>> >>>>>>>>>     at
>> >>>>>>>>> org.apache.kafka.streams.processor.internals.
>> >>>>>>> RecordCollectorImpl.send(
>> >>>>>>>>> RecordCollectorImpl.java:76)
>> >>>>>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>> >>>>>>>>>     at
>> >>>>>>>>> org.apache.kafka.streams.processor.internals.
>> >>>>>>> RecordCollectorImpl.send(
>> >>>>>>>>> RecordCollectorImpl.java:64)
>> >>>>>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>> Again it is not clear why in this case we need to shut down the
>> >>>>>>> steams
>> >>>>>>>>> thread and eventually the application. Shouldn't we capture this
>> >>>>>>> error
>> >>>>>>>>> too?
>> >>>>>>>>>
>> >>>>>>>>> Thanks
>> >>>>>>>>> Sachin
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>
>> >>>>>>>
>> >>>>>>
>> >>>>
>> >>>>
>> >>
>> >>
>>
>>

Re: Kafka streams 0.10.2 Producer throwing exception eventually causing streams shutdown

Posted by Mahendra Kariya <ma...@go-jek.com>.
Thanks!

On Tue, Apr 18, 2017, 12:26 AM Eno Thereska <en...@gmail.com> wrote:

> The RC candidate build is here:
> http://home.apache.org/~gwenshap/kafka-0.10.2.1-rc1/ <
> http://home.apache.org/~gwenshap/kafka-0.10.2.1-rc1/>
>
> Eno
> > On 17 Apr 2017, at 17:20, Mahendra Kariya <ma...@go-jek.com>
> wrote:
> >
> > Thanks!
> >
> > In the meantime, is the jar published somewhere on github or as a part of
> > build pipeline?
> >
> > On Mon, Apr 17, 2017 at 9:18 PM, Eno Thereska <en...@gmail.com>
> > wrote:
> >
> >> Not yet, but as soon as 0.10.2 is voted it should be. Hopefully this
> week.
> >>
> >> Eno
> >>> On 17 Apr 2017, at 13:25, Mahendra Kariya <ma...@go-jek.com>
> >> wrote:
> >>>
> >>> Are the bug fix releases published to Maven central repo?
> >>>
> >>> On Sat, Apr 1, 2017 at 12:26 PM, Eno Thereska <en...@gmail.com>
> >>> wrote:
> >>>
> >>>> Hi Sachin,
> >>>>
> >>>> In the bug fix release for 0.10.2 (and in trunk) we have now set
> >>>> max.poll.interval to infinite since from our experience with streams
> >> this
> >>>> should not be something that users set: https://github.com/apache/
> >>>> kafka/pull/2770/files <
> https://github.com/apache/kafka/pull/2770/files
> >>> .
> >>>>
> >>>> We're in the process of documenting that change. For now you can
> >> increase
> >>>> the request timeout without worrying about max.poll.interval anymore.
> In
> >>>> fact I'd suggest you also increase max.poll.interval as we've done it
> >> above.
> >>>>
> >>>> Thanks
> >>>> Eno
> >>>>
> >>>>> On 1 Apr 2017, at 03:28, Sachin Mittal <sj...@gmail.com> wrote:
> >>>>>
> >>>>> Should this timeout be less than max poll interval value? if yes than
> >>>>> generally speaking what should be the ratio between two or range for
> >> this
> >>>>> timeout value .
> >>>>>
> >>>>> Thanks
> >>>>> Sachin
> >>>>>
> >>>>> On 1 Apr 2017 04:57, "Matthias J. Sax" <ma...@confluent.io>
> wrote:
> >>>>>
> >>>>> Yes, you can increase ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG
> >>>>>
> >>>>>
> >>>>> -Matthias
> >>>>>
> >>>>>
> >>>>> On 3/31/17 11:32 AM, Sachin Mittal wrote:
> >>>>>> Hi,
> >>>>>> So I have added the config ProducerConfig.RETRIES_CONFIG,
> >>>>> Integer.MAX_VALUE
> >>>>>> and the NotLeaderForPartitionException is gone.
> >>>>>>
> >>>>>> However we see a new exception especially under heavy load:
> >>>>>> org.apache.kafka.streams.errors.StreamsException: task [0_1]
> >> exception
> >>>>>> caught when producing
> >>>>>> at
> >>>>>> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.
> >>>>> checkForException(RecordCollectorImpl.java:119)
> >>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >>>>>> at
> >>>>>> org.apache.kafka.streams.processor.internals.
> >> RecordCollectorImpl.flush(
> >>>>> RecordCollectorImpl.java:127)
> >>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]        at
> >>>>>> org.apache.kafka.streams.processor.internals.
> >>>> StreamTask$1.run(StreamTask.
> >>>>> java:76)
> >>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >>>>>> at
> >>>>>> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
> >>>>> measureLatencyNs(StreamsMetricsImpl.java:188)
> >>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >>>>>> at
> >>>>>> org.apache.kafka.streams.processor.internals.
> >>>> StreamTask.commit(StreamTask.
> >>>>> java:280)
> >>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]        at
> >>>>>> org.apache.kafka.streams.processor.internals.StreamThread.commitOne(
> >>>>> StreamThread.java:787)
> >>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >>>>>> at
> >>>>>> org.apache.kafka.streams.processor.internals.StreamThread.commitAll(
> >>>>> StreamThread.java:774)
> >>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]        at
> >>>>>> org.apache.kafka.streams.processor.internals.
> >> StreamThread.maybeCommit(
> >>>>> StreamThread.java:749)
> >>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >>>>>> at
> >>>>>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> >>>>> StreamThread.java:671)
> >>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]        at
> >>>>>> org.apache.kafka.streams.processor.internals.
> >>>>> StreamThread.run(StreamThread.java:378)
> >>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >>>>>> org.apache.kafka.common.errors.TimeoutException: Expiring 1
> record(s)
> >>>> for
> >>>>>> new-part-advice-key-table-changelog-1: 30001 ms has passed since
> last
> >>>>> append
> >>>>>>
> >>>>>> So any idea as why TimeoutException is happening.
> >>>>>> Is this controlled by
> >>>>>> ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG
> >>>>>>
> >>>>>> If yes
> >>>>>> What should the value be set in this given that out consumer
> >>>>>> max.poll.interval.ms is defaul 5 minutes.
> >>>>>>
> >>>>>> Is there any other setting that we should try to avoid such errors
> >> which
> >>>>>> causes stream thread to die.
> >>>>>>
> >>>>>> Thanks
> >>>>>> Sachin
> >>>>>>
> >>>>>>
> >>>>>> On Sun, Mar 26, 2017 at 1:39 AM, Eno Thereska <
> eno.thereska@gmail.com
> >>>
> >>>>>> wrote:
> >>>>>>
> >>>>>>> Hi Sachin,
> >>>>>>>
> >>>>>>> Not in this case.
> >>>>>>>
> >>>>>>> Thanks
> >>>>>>> Eno
> >>>>>>>
> >>>>>>>> On Mar 25, 2017, at 6:19 PM, Sachin Mittal <sj...@gmail.com>
> >>>> wrote:
> >>>>>>>>
> >>>>>>>> OK.
> >>>>>>>> I will try this out.
> >>>>>>>>
> >>>>>>>> Do I need to change anything for
> >>>>>>>> max.in.flight.requests.per.connection
> >>>>>>>>
> >>>>>>>> Thanks
> >>>>>>>> Sachin
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On Sat, Mar 25, 2017 at 10:59 PM, Eno Thereska <
> >>>> eno.thereska@gmail.com>
> >>>>>>>> wrote:
> >>>>>>>>
> >>>>>>>>> Hi Sachin,
> >>>>>>>>>
> >>>>>>>>> For this particular error, “org.apache.kafka.common.errors.
> >>>>>>>>> NotLeaderForPartitionException: This server is not the leader for
> >>>> that
> >>>>>>>>> topic-partition.”, could you try setting the number of retries to
> >>>>>>> something
> >>>>>>>>> large like this:
> >>>>>>>>>
> >>>>>>>>> Properties props = new Properties();
> >>>>>>>>> props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
> >>>>>>>>> ...
> >>>>>>>>> props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
> >>>>>>>>>
> >>>>>>>>> This will retry the produce requests and should hopefully solve
> >> your
> >>>>>>>>> immediate problem.
> >>>>>>>>>
> >>>>>>>>> Thanks
> >>>>>>>>> Eno
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On 25/03/2017, 08:35, "Sachin Mittal" <sj...@gmail.com>
> wrote:
> >>>>>>>>>
> >>>>>>>>> Hi,
> >>>>>>>>> We have encountered another case of series of errors which I
> would
> >>>>>>> need
> >>>>>>>>> more help in understanding.
> >>>>>>>>>
> >>>>>>>>> In logs we see message like this:
> >>>>>>>>> ERROR 2017-03-25 03:41:40,001 [kafka-producer-network-thread |
> >>>>>>>>> 85-StreamThread-3-producer]:
> >>>>>>>>> org.apache.kafka.streams.processor.internals.RecordCollectorImpl
> >> -
> >>>>>>>>> task
> >>>>>>>>> [0_1] Error sending record to topic new-part-advice-key-table-
> >>>>>>> changelog.
> >>>>>>>>> No
> >>>>>>>>> more offsets will be recorded for this task and the exception
> will
> >>>>>>>>> eventually be thrown
> >>>>>>>>>
> >>>>>>>>> then some millisecond later
> >>>>>>>>> ERROR 2017-03-25 03:41:40,149 [StreamThread-3]:
> >>>>>>>>> org.apache.kafka.streams.processor.internals.StreamThread -
> >>>>>>>>> stream-thread
> >>>>>>>>> [StreamThread-3] Failed while executing StreamTask 0_1 due to
> >> flush
> >>>>>>>>> state:
> >>>>>>>>> org.apache.kafka.streams.errors.StreamsException: task [0_1]
> >>>>>>> exception
> >>>>>>>>> caught when producing
> >>>>>>>>>     at
> >>>>>>>>> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.
> >>>>>>>>> checkForException(RecordCollectorImpl.java:119)
> >>>>>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >>>>>>>>>     at
> >>>>>>>>> org.apache.kafka.streams.processor.internals.
> >>>>>>>>> RecordCollectorImpl.flush(RecordCollectorImpl.java:127)
> >>>>>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >>>>>>>>>     at
> >>>>>>>>> org.apache.kafka.streams.processor.internals.
> >>>> StreamTask.flushState(
> >>>>>>>>> StreamTask.java:422)
> >>>>>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >>>>>>>>>     at
> >>>>>>>>> org.apache.kafka.streams.processor.internals.
> >> StreamThread$4.apply(
> >>>>>>>>> StreamThread.java:555)
> >>>>>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >>>>>>>>>     at
> >>>>>>>>> org.apache.kafka.streams.processor.internals.StreamThread.
> >>>>>>>>> performOnAllTasks(StreamThread.java:513)
> >>>>>>>>> [kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >>>>>>>>>     at
> >>>>>>>>> org.apache.kafka.streams.processor.internals.
> >>>>>>>>> StreamThread.flushAllState(StreamThread.java:551)
> >>>>>>>>> [kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >>>>>>>>>     at
> >>>>>>>>> org.apache.kafka.streams.processor.internals.StreamThread.
> >>>>>>>>> shutdownTasksAndState(StreamThread.java:463)
> >>>>>>>>> [kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >>>>>>>>>     at
> >>>>>>>>> org.apache.kafka.streams.processor.internals.
> >>>> StreamThread.shutdown(
> >>>>>>>>> StreamThread.java:408)
> >>>>>>>>> [kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >>>>>>>>>     at
> >>>>>>>>> org.apache.kafka.streams.processor.internals.
> >>>>>>>>> StreamThread.run(StreamThread.java:389)
> >>>>>>>>> [kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >>>>>>>>> org.apache.kafka.common.errors.NotLeaderForPartitionException:
> >>>> This
> >>>>>>>>> server
> >>>>>>>>> is not the leader for that topic-partition.
> >>>>>>>>>
> >>>>>>>>> finally we get this
> >>>>>>>>> ERROR 2017-03-25 03:41:45,724 [StreamThread-3]:
> >>>>>>>>> com.advice.TestKafkaAdvice
> >>>>>>>>> - Uncaught exception:
> >>>>>>>>> org.apache.kafka.streams.errors.StreamsException: Exception
> >> caught
> >>>>>>> in
> >>>>>>>>> process. taskId=0_1, processor=KSTREAM-SOURCE-0000000000,
> >>>>>>>>> topic=advice-stream, partition=1, offset=48062286
> >>>>>>>>>     at
> >>>>>>>>> org.apache.kafka.streams.processor.internals.
> >>>>>>>>> StreamTask.process(StreamTask.java:216)
> >>>>>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >>>>>>>>>     at
> >>>>>>>>> org.apache.kafka.streams.processor.internals.
> >> StreamThread.runLoop(
> >>>>>>>>> StreamThread.java:651)
> >>>>>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >>>>>>>>>     at
> >>>>>>>>> org.apache.kafka.streams.processor.internals.
> >>>>>>>>> StreamThread.run(StreamThread.java:378)
> >>>>>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >>>>>>>>> Caused by: org.apache.kafka.streams.errors.StreamsException: task
> >>>>>>>>> [0_1]
> >>>>>>>>> exception caught when producing
> >>>>>>>>>     at
> >>>>>>>>> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.
> >>>>>>>>> checkForException(RecordCollectorImpl.java:119)
> >>>>>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >>>>>>>>>     at
> >>>>>>>>> org.apache.kafka.streams.processor.internals.
> >>>>>>> RecordCollectorImpl.send(
> >>>>>>>>> RecordCollectorImpl.java:76)
> >>>>>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >>>>>>>>>     at
> >>>>>>>>> org.apache.kafka.streams.processor.internals.
> >>>>>>> RecordCollectorImpl.send(
> >>>>>>>>> RecordCollectorImpl.java:64)
> >>>>>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> Again it is not clear why in this case we need to shut down the
> >>>>>>> steams
> >>>>>>>>> thread and eventually the application. Shouldn't we capture this
> >>>>>>> error
> >>>>>>>>> too?
> >>>>>>>>>
> >>>>>>>>> Thanks
> >>>>>>>>> Sachin
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>
> >>>>
> >>>>
> >>
> >>
>
>

Re: Kafka streams 0.10.2 Producer throwing exception eventually causing streams shutdown

Posted by Eno Thereska <en...@gmail.com>.
The RC candidate build is here: http://home.apache.org/~gwenshap/kafka-0.10.2.1-rc1/ <http://home.apache.org/~gwenshap/kafka-0.10.2.1-rc1/>

Eno
> On 17 Apr 2017, at 17:20, Mahendra Kariya <ma...@go-jek.com> wrote:
> 
> Thanks!
> 
> In the meantime, is the jar published somewhere on github or as a part of
> build pipeline?
> 
> On Mon, Apr 17, 2017 at 9:18 PM, Eno Thereska <en...@gmail.com>
> wrote:
> 
>> Not yet, but as soon as 0.10.2 is voted it should be. Hopefully this week.
>> 
>> Eno
>>> On 17 Apr 2017, at 13:25, Mahendra Kariya <ma...@go-jek.com>
>> wrote:
>>> 
>>> Are the bug fix releases published to Maven central repo?
>>> 
>>> On Sat, Apr 1, 2017 at 12:26 PM, Eno Thereska <en...@gmail.com>
>>> wrote:
>>> 
>>>> Hi Sachin,
>>>> 
>>>> In the bug fix release for 0.10.2 (and in trunk) we have now set
>>>> max.poll.interval to infinite since from our experience with streams
>> this
>>>> should not be something that users set: https://github.com/apache/
>>>> kafka/pull/2770/files <https://github.com/apache/kafka/pull/2770/files
>>> .
>>>> 
>>>> We're in the process of documenting that change. For now you can
>> increase
>>>> the request timeout without worrying about max.poll.interval anymore. In
>>>> fact I'd suggest you also increase max.poll.interval as we've done it
>> above.
>>>> 
>>>> Thanks
>>>> Eno
>>>> 
>>>>> On 1 Apr 2017, at 03:28, Sachin Mittal <sj...@gmail.com> wrote:
>>>>> 
>>>>> Should this timeout be less than max poll interval value? if yes than
>>>>> generally speaking what should be the ratio between two or range for
>> this
>>>>> timeout value .
>>>>> 
>>>>> Thanks
>>>>> Sachin
>>>>> 
>>>>> On 1 Apr 2017 04:57, "Matthias J. Sax" <ma...@confluent.io> wrote:
>>>>> 
>>>>> Yes, you can increase ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG
>>>>> 
>>>>> 
>>>>> -Matthias
>>>>> 
>>>>> 
>>>>> On 3/31/17 11:32 AM, Sachin Mittal wrote:
>>>>>> Hi,
>>>>>> So I have added the config ProducerConfig.RETRIES_CONFIG,
>>>>> Integer.MAX_VALUE
>>>>>> and the NotLeaderForPartitionException is gone.
>>>>>> 
>>>>>> However we see a new exception especially under heavy load:
>>>>>> org.apache.kafka.streams.errors.StreamsException: task [0_1]
>> exception
>>>>>> caught when producing
>>>>>> at
>>>>>> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.
>>>>> checkForException(RecordCollectorImpl.java:119)
>>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>>>>> at
>>>>>> org.apache.kafka.streams.processor.internals.
>> RecordCollectorImpl.flush(
>>>>> RecordCollectorImpl.java:127)
>>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]        at
>>>>>> org.apache.kafka.streams.processor.internals.
>>>> StreamTask$1.run(StreamTask.
>>>>> java:76)
>>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>>>>> at
>>>>>> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
>>>>> measureLatencyNs(StreamsMetricsImpl.java:188)
>>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>>>>> at
>>>>>> org.apache.kafka.streams.processor.internals.
>>>> StreamTask.commit(StreamTask.
>>>>> java:280)
>>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]        at
>>>>>> org.apache.kafka.streams.processor.internals.StreamThread.commitOne(
>>>>> StreamThread.java:787)
>>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>>>>> at
>>>>>> org.apache.kafka.streams.processor.internals.StreamThread.commitAll(
>>>>> StreamThread.java:774)
>>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]        at
>>>>>> org.apache.kafka.streams.processor.internals.
>> StreamThread.maybeCommit(
>>>>> StreamThread.java:749)
>>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>>>>> at
>>>>>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
>>>>> StreamThread.java:671)
>>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]        at
>>>>>> org.apache.kafka.streams.processor.internals.
>>>>> StreamThread.run(StreamThread.java:378)
>>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>>>>> org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s)
>>>> for
>>>>>> new-part-advice-key-table-changelog-1: 30001 ms has passed since last
>>>>> append
>>>>>> 
>>>>>> So any idea as why TimeoutException is happening.
>>>>>> Is this controlled by
>>>>>> ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG
>>>>>> 
>>>>>> If yes
>>>>>> What should the value be set in this given that out consumer
>>>>>> max.poll.interval.ms is defaul 5 minutes.
>>>>>> 
>>>>>> Is there any other setting that we should try to avoid such errors
>> which
>>>>>> causes stream thread to die.
>>>>>> 
>>>>>> Thanks
>>>>>> Sachin
>>>>>> 
>>>>>> 
>>>>>> On Sun, Mar 26, 2017 at 1:39 AM, Eno Thereska <eno.thereska@gmail.com
>>> 
>>>>>> wrote:
>>>>>> 
>>>>>>> Hi Sachin,
>>>>>>> 
>>>>>>> Not in this case.
>>>>>>> 
>>>>>>> Thanks
>>>>>>> Eno
>>>>>>> 
>>>>>>>> On Mar 25, 2017, at 6:19 PM, Sachin Mittal <sj...@gmail.com>
>>>> wrote:
>>>>>>>> 
>>>>>>>> OK.
>>>>>>>> I will try this out.
>>>>>>>> 
>>>>>>>> Do I need to change anything for
>>>>>>>> max.in.flight.requests.per.connection
>>>>>>>> 
>>>>>>>> Thanks
>>>>>>>> Sachin
>>>>>>>> 
>>>>>>>> 
>>>>>>>> On Sat, Mar 25, 2017 at 10:59 PM, Eno Thereska <
>>>> eno.thereska@gmail.com>
>>>>>>>> wrote:
>>>>>>>> 
>>>>>>>>> Hi Sachin,
>>>>>>>>> 
>>>>>>>>> For this particular error, “org.apache.kafka.common.errors.
>>>>>>>>> NotLeaderForPartitionException: This server is not the leader for
>>>> that
>>>>>>>>> topic-partition.”, could you try setting the number of retries to
>>>>>>> something
>>>>>>>>> large like this:
>>>>>>>>> 
>>>>>>>>> Properties props = new Properties();
>>>>>>>>> props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
>>>>>>>>> ...
>>>>>>>>> props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
>>>>>>>>> 
>>>>>>>>> This will retry the produce requests and should hopefully solve
>> your
>>>>>>>>> immediate problem.
>>>>>>>>> 
>>>>>>>>> Thanks
>>>>>>>>> Eno
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> On 25/03/2017, 08:35, "Sachin Mittal" <sj...@gmail.com> wrote:
>>>>>>>>> 
>>>>>>>>> Hi,
>>>>>>>>> We have encountered another case of series of errors which I would
>>>>>>> need
>>>>>>>>> more help in understanding.
>>>>>>>>> 
>>>>>>>>> In logs we see message like this:
>>>>>>>>> ERROR 2017-03-25 03:41:40,001 [kafka-producer-network-thread |
>>>>>>>>> 85-StreamThread-3-producer]:
>>>>>>>>> org.apache.kafka.streams.processor.internals.RecordCollectorImpl
>> -
>>>>>>>>> task
>>>>>>>>> [0_1] Error sending record to topic new-part-advice-key-table-
>>>>>>> changelog.
>>>>>>>>> No
>>>>>>>>> more offsets will be recorded for this task and the exception will
>>>>>>>>> eventually be thrown
>>>>>>>>> 
>>>>>>>>> then some millisecond later
>>>>>>>>> ERROR 2017-03-25 03:41:40,149 [StreamThread-3]:
>>>>>>>>> org.apache.kafka.streams.processor.internals.StreamThread -
>>>>>>>>> stream-thread
>>>>>>>>> [StreamThread-3] Failed while executing StreamTask 0_1 due to
>> flush
>>>>>>>>> state:
>>>>>>>>> org.apache.kafka.streams.errors.StreamsException: task [0_1]
>>>>>>> exception
>>>>>>>>> caught when producing
>>>>>>>>>     at
>>>>>>>>> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.
>>>>>>>>> checkForException(RecordCollectorImpl.java:119)
>>>>>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>>>>>>>>     at
>>>>>>>>> org.apache.kafka.streams.processor.internals.
>>>>>>>>> RecordCollectorImpl.flush(RecordCollectorImpl.java:127)
>>>>>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>>>>>>>>     at
>>>>>>>>> org.apache.kafka.streams.processor.internals.
>>>> StreamTask.flushState(
>>>>>>>>> StreamTask.java:422)
>>>>>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>>>>>>>>     at
>>>>>>>>> org.apache.kafka.streams.processor.internals.
>> StreamThread$4.apply(
>>>>>>>>> StreamThread.java:555)
>>>>>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>>>>>>>>     at
>>>>>>>>> org.apache.kafka.streams.processor.internals.StreamThread.
>>>>>>>>> performOnAllTasks(StreamThread.java:513)
>>>>>>>>> [kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>>>>>>>>     at
>>>>>>>>> org.apache.kafka.streams.processor.internals.
>>>>>>>>> StreamThread.flushAllState(StreamThread.java:551)
>>>>>>>>> [kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>>>>>>>>     at
>>>>>>>>> org.apache.kafka.streams.processor.internals.StreamThread.
>>>>>>>>> shutdownTasksAndState(StreamThread.java:463)
>>>>>>>>> [kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>>>>>>>>     at
>>>>>>>>> org.apache.kafka.streams.processor.internals.
>>>> StreamThread.shutdown(
>>>>>>>>> StreamThread.java:408)
>>>>>>>>> [kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>>>>>>>>     at
>>>>>>>>> org.apache.kafka.streams.processor.internals.
>>>>>>>>> StreamThread.run(StreamThread.java:389)
>>>>>>>>> [kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>>>>>>>> org.apache.kafka.common.errors.NotLeaderForPartitionException:
>>>> This
>>>>>>>>> server
>>>>>>>>> is not the leader for that topic-partition.
>>>>>>>>> 
>>>>>>>>> finally we get this
>>>>>>>>> ERROR 2017-03-25 03:41:45,724 [StreamThread-3]:
>>>>>>>>> com.advice.TestKafkaAdvice
>>>>>>>>> - Uncaught exception:
>>>>>>>>> org.apache.kafka.streams.errors.StreamsException: Exception
>> caught
>>>>>>> in
>>>>>>>>> process. taskId=0_1, processor=KSTREAM-SOURCE-0000000000,
>>>>>>>>> topic=advice-stream, partition=1, offset=48062286
>>>>>>>>>     at
>>>>>>>>> org.apache.kafka.streams.processor.internals.
>>>>>>>>> StreamTask.process(StreamTask.java:216)
>>>>>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>>>>>>>>     at
>>>>>>>>> org.apache.kafka.streams.processor.internals.
>> StreamThread.runLoop(
>>>>>>>>> StreamThread.java:651)
>>>>>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>>>>>>>>     at
>>>>>>>>> org.apache.kafka.streams.processor.internals.
>>>>>>>>> StreamThread.run(StreamThread.java:378)
>>>>>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>>>>>>>> Caused by: org.apache.kafka.streams.errors.StreamsException: task
>>>>>>>>> [0_1]
>>>>>>>>> exception caught when producing
>>>>>>>>>     at
>>>>>>>>> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.
>>>>>>>>> checkForException(RecordCollectorImpl.java:119)
>>>>>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>>>>>>>>     at
>>>>>>>>> org.apache.kafka.streams.processor.internals.
>>>>>>> RecordCollectorImpl.send(
>>>>>>>>> RecordCollectorImpl.java:76)
>>>>>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>>>>>>>>     at
>>>>>>>>> org.apache.kafka.streams.processor.internals.
>>>>>>> RecordCollectorImpl.send(
>>>>>>>>> RecordCollectorImpl.java:64)
>>>>>>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> Again it is not clear why in this case we need to shut down the
>>>>>>> steams
>>>>>>>>> thread and eventually the application. Shouldn't we capture this
>>>>>>> error
>>>>>>>>> too?
>>>>>>>>> 
>>>>>>>>> Thanks
>>>>>>>>> Sachin
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>> 
>>>> 
>>>> 
>> 
>> 


Re: Kafka streams 0.10.2 Producer throwing exception eventually causing streams shutdown

Posted by Mahendra Kariya <ma...@go-jek.com>.
Thanks!

In the meantime, is the jar published somewhere on github or as a part of
build pipeline?

On Mon, Apr 17, 2017 at 9:18 PM, Eno Thereska <en...@gmail.com>
wrote:

> Not yet, but as soon as 0.10.2 is voted it should be. Hopefully this week.
>
> Eno
> > On 17 Apr 2017, at 13:25, Mahendra Kariya <ma...@go-jek.com>
> wrote:
> >
> > Are the bug fix releases published to Maven central repo?
> >
> > On Sat, Apr 1, 2017 at 12:26 PM, Eno Thereska <en...@gmail.com>
> > wrote:
> >
> >> Hi Sachin,
> >>
> >> In the bug fix release for 0.10.2 (and in trunk) we have now set
> >> max.poll.interval to infinite since from our experience with streams
> this
> >> should not be something that users set: https://github.com/apache/
> >> kafka/pull/2770/files <https://github.com/apache/kafka/pull/2770/files
> >.
> >>
> >> We're in the process of documenting that change. For now you can
> increase
> >> the request timeout without worrying about max.poll.interval anymore. In
> >> fact I'd suggest you also increase max.poll.interval as we've done it
> above.
> >>
> >> Thanks
> >> Eno
> >>
> >>> On 1 Apr 2017, at 03:28, Sachin Mittal <sj...@gmail.com> wrote:
> >>>
> >>> Should this timeout be less than max poll interval value? if yes than
> >>> generally speaking what should be the ratio between two or range for
> this
> >>> timeout value .
> >>>
> >>> Thanks
> >>> Sachin
> >>>
> >>> On 1 Apr 2017 04:57, "Matthias J. Sax" <ma...@confluent.io> wrote:
> >>>
> >>> Yes, you can increase ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG
> >>>
> >>>
> >>> -Matthias
> >>>
> >>>
> >>> On 3/31/17 11:32 AM, Sachin Mittal wrote:
> >>>> Hi,
> >>>> So I have added the config ProducerConfig.RETRIES_CONFIG,
> >>> Integer.MAX_VALUE
> >>>> and the NotLeaderForPartitionException is gone.
> >>>>
> >>>> However we see a new exception especially under heavy load:
> >>>> org.apache.kafka.streams.errors.StreamsException: task [0_1]
> exception
> >>>> caught when producing
> >>>> at
> >>>> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.
> >>> checkForException(RecordCollectorImpl.java:119)
> >>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >>>> at
> >>>> org.apache.kafka.streams.processor.internals.
> RecordCollectorImpl.flush(
> >>> RecordCollectorImpl.java:127)
> >>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]        at
> >>>> org.apache.kafka.streams.processor.internals.
> >> StreamTask$1.run(StreamTask.
> >>> java:76)
> >>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >>>> at
> >>>> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
> >>> measureLatencyNs(StreamsMetricsImpl.java:188)
> >>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >>>> at
> >>>> org.apache.kafka.streams.processor.internals.
> >> StreamTask.commit(StreamTask.
> >>> java:280)
> >>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]        at
> >>>> org.apache.kafka.streams.processor.internals.StreamThread.commitOne(
> >>> StreamThread.java:787)
> >>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >>>> at
> >>>> org.apache.kafka.streams.processor.internals.StreamThread.commitAll(
> >>> StreamThread.java:774)
> >>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]        at
> >>>> org.apache.kafka.streams.processor.internals.
> StreamThread.maybeCommit(
> >>> StreamThread.java:749)
> >>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >>>> at
> >>>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> >>> StreamThread.java:671)
> >>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]        at
> >>>> org.apache.kafka.streams.processor.internals.
> >>> StreamThread.run(StreamThread.java:378)
> >>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >>>> org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s)
> >> for
> >>>> new-part-advice-key-table-changelog-1: 30001 ms has passed since last
> >>> append
> >>>>
> >>>> So any idea as why TimeoutException is happening.
> >>>> Is this controlled by
> >>>> ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG
> >>>>
> >>>> If yes
> >>>> What should the value be set in this given that out consumer
> >>>> max.poll.interval.ms is defaul 5 minutes.
> >>>>
> >>>> Is there any other setting that we should try to avoid such errors
> which
> >>>> causes stream thread to die.
> >>>>
> >>>> Thanks
> >>>> Sachin
> >>>>
> >>>>
> >>>> On Sun, Mar 26, 2017 at 1:39 AM, Eno Thereska <eno.thereska@gmail.com
> >
> >>>> wrote:
> >>>>
> >>>>> Hi Sachin,
> >>>>>
> >>>>> Not in this case.
> >>>>>
> >>>>> Thanks
> >>>>> Eno
> >>>>>
> >>>>>> On Mar 25, 2017, at 6:19 PM, Sachin Mittal <sj...@gmail.com>
> >> wrote:
> >>>>>>
> >>>>>> OK.
> >>>>>> I will try this out.
> >>>>>>
> >>>>>> Do I need to change anything for
> >>>>>> max.in.flight.requests.per.connection
> >>>>>>
> >>>>>> Thanks
> >>>>>> Sachin
> >>>>>>
> >>>>>>
> >>>>>> On Sat, Mar 25, 2017 at 10:59 PM, Eno Thereska <
> >> eno.thereska@gmail.com>
> >>>>>> wrote:
> >>>>>>
> >>>>>>> Hi Sachin,
> >>>>>>>
> >>>>>>> For this particular error, “org.apache.kafka.common.errors.
> >>>>>>> NotLeaderForPartitionException: This server is not the leader for
> >> that
> >>>>>>> topic-partition.”, could you try setting the number of retries to
> >>>>> something
> >>>>>>> large like this:
> >>>>>>>
> >>>>>>> Properties props = new Properties();
> >>>>>>> props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
> >>>>>>> ...
> >>>>>>> props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
> >>>>>>>
> >>>>>>> This will retry the produce requests and should hopefully solve
> your
> >>>>>>> immediate problem.
> >>>>>>>
> >>>>>>> Thanks
> >>>>>>> Eno
> >>>>>>>
> >>>>>>>
> >>>>>>> On 25/03/2017, 08:35, "Sachin Mittal" <sj...@gmail.com> wrote:
> >>>>>>>
> >>>>>>>  Hi,
> >>>>>>>  We have encountered another case of series of errors which I would
> >>>>> need
> >>>>>>>  more help in understanding.
> >>>>>>>
> >>>>>>>  In logs we see message like this:
> >>>>>>>  ERROR 2017-03-25 03:41:40,001 [kafka-producer-network-thread |
> >>>>>>>  85-StreamThread-3-producer]:
> >>>>>>>  org.apache.kafka.streams.processor.internals.RecordCollectorImpl
> -
> >>>>>>> task
> >>>>>>>  [0_1] Error sending record to topic new-part-advice-key-table-
> >>>>> changelog.
> >>>>>>> No
> >>>>>>>  more offsets will be recorded for this task and the exception will
> >>>>>>>  eventually be thrown
> >>>>>>>
> >>>>>>>  then some millisecond later
> >>>>>>>  ERROR 2017-03-25 03:41:40,149 [StreamThread-3]:
> >>>>>>>  org.apache.kafka.streams.processor.internals.StreamThread -
> >>>>>>> stream-thread
> >>>>>>>  [StreamThread-3] Failed while executing StreamTask 0_1 due to
> flush
> >>>>>>> state:
> >>>>>>>  org.apache.kafka.streams.errors.StreamsException: task [0_1]
> >>>>> exception
> >>>>>>>  caught when producing
> >>>>>>>      at
> >>>>>>>  org.apache.kafka.streams.processor.internals.RecordCollectorImpl.
> >>>>>>> checkForException(RecordCollectorImpl.java:119)
> >>>>>>>  ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >>>>>>>      at
> >>>>>>>  org.apache.kafka.streams.processor.internals.
> >>>>>>> RecordCollectorImpl.flush(RecordCollectorImpl.java:127)
> >>>>>>>  ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >>>>>>>      at
> >>>>>>>  org.apache.kafka.streams.processor.internals.
> >> StreamTask.flushState(
> >>>>>>> StreamTask.java:422)
> >>>>>>>  ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >>>>>>>      at
> >>>>>>>  org.apache.kafka.streams.processor.internals.
> StreamThread$4.apply(
> >>>>>>> StreamThread.java:555)
> >>>>>>>  ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >>>>>>>      at
> >>>>>>>  org.apache.kafka.streams.processor.internals.StreamThread.
> >>>>>>> performOnAllTasks(StreamThread.java:513)
> >>>>>>>  [kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >>>>>>>      at
> >>>>>>>  org.apache.kafka.streams.processor.internals.
> >>>>>>> StreamThread.flushAllState(StreamThread.java:551)
> >>>>>>>  [kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >>>>>>>      at
> >>>>>>>  org.apache.kafka.streams.processor.internals.StreamThread.
> >>>>>>> shutdownTasksAndState(StreamThread.java:463)
> >>>>>>>  [kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >>>>>>>      at
> >>>>>>>  org.apache.kafka.streams.processor.internals.
> >> StreamThread.shutdown(
> >>>>>>> StreamThread.java:408)
> >>>>>>>  [kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >>>>>>>      at
> >>>>>>>  org.apache.kafka.streams.processor.internals.
> >>>>>>> StreamThread.run(StreamThread.java:389)
> >>>>>>>  [kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >>>>>>>  org.apache.kafka.common.errors.NotLeaderForPartitionException:
> >> This
> >>>>>>> server
> >>>>>>>  is not the leader for that topic-partition.
> >>>>>>>
> >>>>>>>  finally we get this
> >>>>>>>  ERROR 2017-03-25 03:41:45,724 [StreamThread-3]:
> >>>>>>> com.advice.TestKafkaAdvice
> >>>>>>>  - Uncaught exception:
> >>>>>>>  org.apache.kafka.streams.errors.StreamsException: Exception
> caught
> >>>>> in
> >>>>>>>  process. taskId=0_1, processor=KSTREAM-SOURCE-0000000000,
> >>>>>>>  topic=advice-stream, partition=1, offset=48062286
> >>>>>>>      at
> >>>>>>>  org.apache.kafka.streams.processor.internals.
> >>>>>>> StreamTask.process(StreamTask.java:216)
> >>>>>>>  ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >>>>>>>      at
> >>>>>>>  org.apache.kafka.streams.processor.internals.
> StreamThread.runLoop(
> >>>>>>> StreamThread.java:651)
> >>>>>>>  ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >>>>>>>      at
> >>>>>>>  org.apache.kafka.streams.processor.internals.
> >>>>>>> StreamThread.run(StreamThread.java:378)
> >>>>>>>  ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >>>>>>>  Caused by: org.apache.kafka.streams.errors.StreamsException: task
> >>>>>>> [0_1]
> >>>>>>>  exception caught when producing
> >>>>>>>      at
> >>>>>>>  org.apache.kafka.streams.processor.internals.RecordCollectorImpl.
> >>>>>>> checkForException(RecordCollectorImpl.java:119)
> >>>>>>>  ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >>>>>>>      at
> >>>>>>>  org.apache.kafka.streams.processor.internals.
> >>>>> RecordCollectorImpl.send(
> >>>>>>> RecordCollectorImpl.java:76)
> >>>>>>>  ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >>>>>>>      at
> >>>>>>>  org.apache.kafka.streams.processor.internals.
> >>>>> RecordCollectorImpl.send(
> >>>>>>> RecordCollectorImpl.java:64)
> >>>>>>>  ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >>>>>>>
> >>>>>>>
> >>>>>>>  Again it is not clear why in this case we need to shut down the
> >>>>> steams
> >>>>>>>  thread and eventually the application. Shouldn't we capture this
> >>>>> error
> >>>>>>> too?
> >>>>>>>
> >>>>>>>  Thanks
> >>>>>>>  Sachin
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>
> >>>>>
> >>>>
> >>
> >>
>
>

Re: Kafka streams 0.10.2 Producer throwing exception eventually causing streams shutdown

Posted by Eno Thereska <en...@gmail.com>.
Not yet, but as soon as 0.10.2 is voted it should be. Hopefully this week.

Eno
> On 17 Apr 2017, at 13:25, Mahendra Kariya <ma...@go-jek.com> wrote:
> 
> Are the bug fix releases published to Maven central repo?
> 
> On Sat, Apr 1, 2017 at 12:26 PM, Eno Thereska <en...@gmail.com>
> wrote:
> 
>> Hi Sachin,
>> 
>> In the bug fix release for 0.10.2 (and in trunk) we have now set
>> max.poll.interval to infinite since from our experience with streams this
>> should not be something that users set: https://github.com/apache/
>> kafka/pull/2770/files <https://github.com/apache/kafka/pull/2770/files>.
>> 
>> We're in the process of documenting that change. For now you can increase
>> the request timeout without worrying about max.poll.interval anymore. In
>> fact I'd suggest you also increase max.poll.interval as we've done it above.
>> 
>> Thanks
>> Eno
>> 
>>> On 1 Apr 2017, at 03:28, Sachin Mittal <sj...@gmail.com> wrote:
>>> 
>>> Should this timeout be less than max poll interval value? if yes than
>>> generally speaking what should be the ratio between two or range for this
>>> timeout value .
>>> 
>>> Thanks
>>> Sachin
>>> 
>>> On 1 Apr 2017 04:57, "Matthias J. Sax" <ma...@confluent.io> wrote:
>>> 
>>> Yes, you can increase ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG
>>> 
>>> 
>>> -Matthias
>>> 
>>> 
>>> On 3/31/17 11:32 AM, Sachin Mittal wrote:
>>>> Hi,
>>>> So I have added the config ProducerConfig.RETRIES_CONFIG,
>>> Integer.MAX_VALUE
>>>> and the NotLeaderForPartitionException is gone.
>>>> 
>>>> However we see a new exception especially under heavy load:
>>>> org.apache.kafka.streams.errors.StreamsException: task [0_1] exception
>>>> caught when producing
>>>> at
>>>> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.
>>> checkForException(RecordCollectorImpl.java:119)
>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>>> at
>>>> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(
>>> RecordCollectorImpl.java:127)
>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]        at
>>>> org.apache.kafka.streams.processor.internals.
>> StreamTask$1.run(StreamTask.
>>> java:76)
>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>>> at
>>>> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
>>> measureLatencyNs(StreamsMetricsImpl.java:188)
>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>>> at
>>>> org.apache.kafka.streams.processor.internals.
>> StreamTask.commit(StreamTask.
>>> java:280)
>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]        at
>>>> org.apache.kafka.streams.processor.internals.StreamThread.commitOne(
>>> StreamThread.java:787)
>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>>> at
>>>> org.apache.kafka.streams.processor.internals.StreamThread.commitAll(
>>> StreamThread.java:774)
>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]        at
>>>> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(
>>> StreamThread.java:749)
>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>>> at
>>>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
>>> StreamThread.java:671)
>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]        at
>>>> org.apache.kafka.streams.processor.internals.
>>> StreamThread.run(StreamThread.java:378)
>>>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>>> org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s)
>> for
>>>> new-part-advice-key-table-changelog-1: 30001 ms has passed since last
>>> append
>>>> 
>>>> So any idea as why TimeoutException is happening.
>>>> Is this controlled by
>>>> ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG
>>>> 
>>>> If yes
>>>> What should the value be set in this given that out consumer
>>>> max.poll.interval.ms is defaul 5 minutes.
>>>> 
>>>> Is there any other setting that we should try to avoid such errors which
>>>> causes stream thread to die.
>>>> 
>>>> Thanks
>>>> Sachin
>>>> 
>>>> 
>>>> On Sun, Mar 26, 2017 at 1:39 AM, Eno Thereska <en...@gmail.com>
>>>> wrote:
>>>> 
>>>>> Hi Sachin,
>>>>> 
>>>>> Not in this case.
>>>>> 
>>>>> Thanks
>>>>> Eno
>>>>> 
>>>>>> On Mar 25, 2017, at 6:19 PM, Sachin Mittal <sj...@gmail.com>
>> wrote:
>>>>>> 
>>>>>> OK.
>>>>>> I will try this out.
>>>>>> 
>>>>>> Do I need to change anything for
>>>>>> max.in.flight.requests.per.connection
>>>>>> 
>>>>>> Thanks
>>>>>> Sachin
>>>>>> 
>>>>>> 
>>>>>> On Sat, Mar 25, 2017 at 10:59 PM, Eno Thereska <
>> eno.thereska@gmail.com>
>>>>>> wrote:
>>>>>> 
>>>>>>> Hi Sachin,
>>>>>>> 
>>>>>>> For this particular error, “org.apache.kafka.common.errors.
>>>>>>> NotLeaderForPartitionException: This server is not the leader for
>> that
>>>>>>> topic-partition.”, could you try setting the number of retries to
>>>>> something
>>>>>>> large like this:
>>>>>>> 
>>>>>>> Properties props = new Properties();
>>>>>>> props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
>>>>>>> ...
>>>>>>> props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
>>>>>>> 
>>>>>>> This will retry the produce requests and should hopefully solve your
>>>>>>> immediate problem.
>>>>>>> 
>>>>>>> Thanks
>>>>>>> Eno
>>>>>>> 
>>>>>>> 
>>>>>>> On 25/03/2017, 08:35, "Sachin Mittal" <sj...@gmail.com> wrote:
>>>>>>> 
>>>>>>>  Hi,
>>>>>>>  We have encountered another case of series of errors which I would
>>>>> need
>>>>>>>  more help in understanding.
>>>>>>> 
>>>>>>>  In logs we see message like this:
>>>>>>>  ERROR 2017-03-25 03:41:40,001 [kafka-producer-network-thread |
>>>>>>>  85-StreamThread-3-producer]:
>>>>>>>  org.apache.kafka.streams.processor.internals.RecordCollectorImpl -
>>>>>>> task
>>>>>>>  [0_1] Error sending record to topic new-part-advice-key-table-
>>>>> changelog.
>>>>>>> No
>>>>>>>  more offsets will be recorded for this task and the exception will
>>>>>>>  eventually be thrown
>>>>>>> 
>>>>>>>  then some millisecond later
>>>>>>>  ERROR 2017-03-25 03:41:40,149 [StreamThread-3]:
>>>>>>>  org.apache.kafka.streams.processor.internals.StreamThread -
>>>>>>> stream-thread
>>>>>>>  [StreamThread-3] Failed while executing StreamTask 0_1 due to flush
>>>>>>> state:
>>>>>>>  org.apache.kafka.streams.errors.StreamsException: task [0_1]
>>>>> exception
>>>>>>>  caught when producing
>>>>>>>      at
>>>>>>>  org.apache.kafka.streams.processor.internals.RecordCollectorImpl.
>>>>>>> checkForException(RecordCollectorImpl.java:119)
>>>>>>>  ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>>>>>>      at
>>>>>>>  org.apache.kafka.streams.processor.internals.
>>>>>>> RecordCollectorImpl.flush(RecordCollectorImpl.java:127)
>>>>>>>  ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>>>>>>      at
>>>>>>>  org.apache.kafka.streams.processor.internals.
>> StreamTask.flushState(
>>>>>>> StreamTask.java:422)
>>>>>>>  ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>>>>>>      at
>>>>>>>  org.apache.kafka.streams.processor.internals.StreamThread$4.apply(
>>>>>>> StreamThread.java:555)
>>>>>>>  ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>>>>>>      at
>>>>>>>  org.apache.kafka.streams.processor.internals.StreamThread.
>>>>>>> performOnAllTasks(StreamThread.java:513)
>>>>>>>  [kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>>>>>>      at
>>>>>>>  org.apache.kafka.streams.processor.internals.
>>>>>>> StreamThread.flushAllState(StreamThread.java:551)
>>>>>>>  [kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>>>>>>      at
>>>>>>>  org.apache.kafka.streams.processor.internals.StreamThread.
>>>>>>> shutdownTasksAndState(StreamThread.java:463)
>>>>>>>  [kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>>>>>>      at
>>>>>>>  org.apache.kafka.streams.processor.internals.
>> StreamThread.shutdown(
>>>>>>> StreamThread.java:408)
>>>>>>>  [kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>>>>>>      at
>>>>>>>  org.apache.kafka.streams.processor.internals.
>>>>>>> StreamThread.run(StreamThread.java:389)
>>>>>>>  [kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>>>>>>  org.apache.kafka.common.errors.NotLeaderForPartitionException:
>> This
>>>>>>> server
>>>>>>>  is not the leader for that topic-partition.
>>>>>>> 
>>>>>>>  finally we get this
>>>>>>>  ERROR 2017-03-25 03:41:45,724 [StreamThread-3]:
>>>>>>> com.advice.TestKafkaAdvice
>>>>>>>  - Uncaught exception:
>>>>>>>  org.apache.kafka.streams.errors.StreamsException: Exception caught
>>>>> in
>>>>>>>  process. taskId=0_1, processor=KSTREAM-SOURCE-0000000000,
>>>>>>>  topic=advice-stream, partition=1, offset=48062286
>>>>>>>      at
>>>>>>>  org.apache.kafka.streams.processor.internals.
>>>>>>> StreamTask.process(StreamTask.java:216)
>>>>>>>  ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>>>>>>      at
>>>>>>>  org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
>>>>>>> StreamThread.java:651)
>>>>>>>  ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>>>>>>      at
>>>>>>>  org.apache.kafka.streams.processor.internals.
>>>>>>> StreamThread.run(StreamThread.java:378)
>>>>>>>  ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>>>>>>  Caused by: org.apache.kafka.streams.errors.StreamsException: task
>>>>>>> [0_1]
>>>>>>>  exception caught when producing
>>>>>>>      at
>>>>>>>  org.apache.kafka.streams.processor.internals.RecordCollectorImpl.
>>>>>>> checkForException(RecordCollectorImpl.java:119)
>>>>>>>  ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>>>>>>      at
>>>>>>>  org.apache.kafka.streams.processor.internals.
>>>>> RecordCollectorImpl.send(
>>>>>>> RecordCollectorImpl.java:76)
>>>>>>>  ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>>>>>>      at
>>>>>>>  org.apache.kafka.streams.processor.internals.
>>>>> RecordCollectorImpl.send(
>>>>>>> RecordCollectorImpl.java:64)
>>>>>>>  ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>>>>>> 
>>>>>>> 
>>>>>>>  Again it is not clear why in this case we need to shut down the
>>>>> steams
>>>>>>>  thread and eventually the application. Shouldn't we capture this
>>>>> error
>>>>>>> too?
>>>>>>> 
>>>>>>>  Thanks
>>>>>>>  Sachin
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>> 
>>>>> 
>>>> 
>> 
>> 


Re: Kafka streams 0.10.2 Producer throwing exception eventually causing streams shutdown

Posted by Mahendra Kariya <ma...@go-jek.com>.
Are the bug fix releases published to Maven central repo?

On Sat, Apr 1, 2017 at 12:26 PM, Eno Thereska <en...@gmail.com>
wrote:

> Hi Sachin,
>
> In the bug fix release for 0.10.2 (and in trunk) we have now set
> max.poll.interval to infinite since from our experience with streams this
> should not be something that users set: https://github.com/apache/
> kafka/pull/2770/files <https://github.com/apache/kafka/pull/2770/files>.
>
> We're in the process of documenting that change. For now you can increase
> the request timeout without worrying about max.poll.interval anymore. In
> fact I'd suggest you also increase max.poll.interval as we've done it above.
>
> Thanks
> Eno
>
> > On 1 Apr 2017, at 03:28, Sachin Mittal <sj...@gmail.com> wrote:
> >
> > Should this timeout be less than max poll interval value? if yes than
> > generally speaking what should be the ratio between two or range for this
> > timeout value .
> >
> > Thanks
> > Sachin
> >
> > On 1 Apr 2017 04:57, "Matthias J. Sax" <ma...@confluent.io> wrote:
> >
> > Yes, you can increase ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG
> >
> >
> > -Matthias
> >
> >
> > On 3/31/17 11:32 AM, Sachin Mittal wrote:
> >> Hi,
> >> So I have added the config ProducerConfig.RETRIES_CONFIG,
> > Integer.MAX_VALUE
> >> and the NotLeaderForPartitionException is gone.
> >>
> >> However we see a new exception especially under heavy load:
> >> org.apache.kafka.streams.errors.StreamsException: task [0_1] exception
> >> caught when producing
> >>  at
> >> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.
> > checkForException(RecordCollectorImpl.java:119)
> >> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >>  at
> >> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(
> > RecordCollectorImpl.java:127)
> >> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]        at
> >> org.apache.kafka.streams.processor.internals.
> StreamTask$1.run(StreamTask.
> > java:76)
> >> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >>  at
> >> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
> > measureLatencyNs(StreamsMetricsImpl.java:188)
> >> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >>  at
> >> org.apache.kafka.streams.processor.internals.
> StreamTask.commit(StreamTask.
> > java:280)
> >> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]        at
> >> org.apache.kafka.streams.processor.internals.StreamThread.commitOne(
> > StreamThread.java:787)
> >> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >>  at
> >> org.apache.kafka.streams.processor.internals.StreamThread.commitAll(
> > StreamThread.java:774)
> >> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]        at
> >> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(
> > StreamThread.java:749)
> >> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >>  at
> >> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> > StreamThread.java:671)
> >> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]        at
> >> org.apache.kafka.streams.processor.internals.
> > StreamThread.run(StreamThread.java:378)
> >> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >> org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s)
> for
> >> new-part-advice-key-table-changelog-1: 30001 ms has passed since last
> > append
> >>
> >> So any idea as why TimeoutException is happening.
> >> Is this controlled by
> >> ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG
> >>
> >> If yes
> >> What should the value be set in this given that out consumer
> >> max.poll.interval.ms is defaul 5 minutes.
> >>
> >> Is there any other setting that we should try to avoid such errors which
> >> causes stream thread to die.
> >>
> >> Thanks
> >> Sachin
> >>
> >>
> >> On Sun, Mar 26, 2017 at 1:39 AM, Eno Thereska <en...@gmail.com>
> >> wrote:
> >>
> >>> Hi Sachin,
> >>>
> >>> Not in this case.
> >>>
> >>> Thanks
> >>> Eno
> >>>
> >>>> On Mar 25, 2017, at 6:19 PM, Sachin Mittal <sj...@gmail.com>
> wrote:
> >>>>
> >>>> OK.
> >>>> I will try this out.
> >>>>
> >>>> Do I need to change anything for
> >>>> max.in.flight.requests.per.connection
> >>>>
> >>>> Thanks
> >>>> Sachin
> >>>>
> >>>>
> >>>> On Sat, Mar 25, 2017 at 10:59 PM, Eno Thereska <
> eno.thereska@gmail.com>
> >>>> wrote:
> >>>>
> >>>>> Hi Sachin,
> >>>>>
> >>>>> For this particular error, “org.apache.kafka.common.errors.
> >>>>> NotLeaderForPartitionException: This server is not the leader for
> that
> >>>>> topic-partition.”, could you try setting the number of retries to
> >>> something
> >>>>> large like this:
> >>>>>
> >>>>> Properties props = new Properties();
> >>>>> props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
> >>>>> ...
> >>>>> props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
> >>>>>
> >>>>> This will retry the produce requests and should hopefully solve your
> >>>>> immediate problem.
> >>>>>
> >>>>> Thanks
> >>>>> Eno
> >>>>>
> >>>>>
> >>>>> On 25/03/2017, 08:35, "Sachin Mittal" <sj...@gmail.com> wrote:
> >>>>>
> >>>>>   Hi,
> >>>>>   We have encountered another case of series of errors which I would
> >>> need
> >>>>>   more help in understanding.
> >>>>>
> >>>>>   In logs we see message like this:
> >>>>>   ERROR 2017-03-25 03:41:40,001 [kafka-producer-network-thread |
> >>>>>   85-StreamThread-3-producer]:
> >>>>>   org.apache.kafka.streams.processor.internals.RecordCollectorImpl -
> >>>>> task
> >>>>>   [0_1] Error sending record to topic new-part-advice-key-table-
> >>> changelog.
> >>>>> No
> >>>>>   more offsets will be recorded for this task and the exception will
> >>>>>   eventually be thrown
> >>>>>
> >>>>>   then some millisecond later
> >>>>>   ERROR 2017-03-25 03:41:40,149 [StreamThread-3]:
> >>>>>   org.apache.kafka.streams.processor.internals.StreamThread -
> >>>>> stream-thread
> >>>>>   [StreamThread-3] Failed while executing StreamTask 0_1 due to flush
> >>>>> state:
> >>>>>   org.apache.kafka.streams.errors.StreamsException: task [0_1]
> >>> exception
> >>>>>   caught when producing
> >>>>>       at
> >>>>>   org.apache.kafka.streams.processor.internals.RecordCollectorImpl.
> >>>>> checkForException(RecordCollectorImpl.java:119)
> >>>>>   ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >>>>>       at
> >>>>>   org.apache.kafka.streams.processor.internals.
> >>>>> RecordCollectorImpl.flush(RecordCollectorImpl.java:127)
> >>>>>   ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >>>>>       at
> >>>>>   org.apache.kafka.streams.processor.internals.
> StreamTask.flushState(
> >>>>> StreamTask.java:422)
> >>>>>   ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >>>>>       at
> >>>>>   org.apache.kafka.streams.processor.internals.StreamThread$4.apply(
> >>>>> StreamThread.java:555)
> >>>>>   ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >>>>>       at
> >>>>>   org.apache.kafka.streams.processor.internals.StreamThread.
> >>>>> performOnAllTasks(StreamThread.java:513)
> >>>>>   [kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >>>>>       at
> >>>>>   org.apache.kafka.streams.processor.internals.
> >>>>> StreamThread.flushAllState(StreamThread.java:551)
> >>>>>   [kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >>>>>       at
> >>>>>   org.apache.kafka.streams.processor.internals.StreamThread.
> >>>>> shutdownTasksAndState(StreamThread.java:463)
> >>>>>   [kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >>>>>       at
> >>>>>   org.apache.kafka.streams.processor.internals.
> StreamThread.shutdown(
> >>>>> StreamThread.java:408)
> >>>>>   [kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >>>>>       at
> >>>>>   org.apache.kafka.streams.processor.internals.
> >>>>> StreamThread.run(StreamThread.java:389)
> >>>>>   [kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >>>>>   org.apache.kafka.common.errors.NotLeaderForPartitionException:
> This
> >>>>> server
> >>>>>   is not the leader for that topic-partition.
> >>>>>
> >>>>>   finally we get this
> >>>>>   ERROR 2017-03-25 03:41:45,724 [StreamThread-3]:
> >>>>> com.advice.TestKafkaAdvice
> >>>>>   - Uncaught exception:
> >>>>>   org.apache.kafka.streams.errors.StreamsException: Exception caught
> >>> in
> >>>>>   process. taskId=0_1, processor=KSTREAM-SOURCE-0000000000,
> >>>>>   topic=advice-stream, partition=1, offset=48062286
> >>>>>       at
> >>>>>   org.apache.kafka.streams.processor.internals.
> >>>>> StreamTask.process(StreamTask.java:216)
> >>>>>   ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >>>>>       at
> >>>>>   org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> >>>>> StreamThread.java:651)
> >>>>>   ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >>>>>       at
> >>>>>   org.apache.kafka.streams.processor.internals.
> >>>>> StreamThread.run(StreamThread.java:378)
> >>>>>   ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >>>>>   Caused by: org.apache.kafka.streams.errors.StreamsException: task
> >>>>> [0_1]
> >>>>>   exception caught when producing
> >>>>>       at
> >>>>>   org.apache.kafka.streams.processor.internals.RecordCollectorImpl.
> >>>>> checkForException(RecordCollectorImpl.java:119)
> >>>>>   ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >>>>>       at
> >>>>>   org.apache.kafka.streams.processor.internals.
> >>> RecordCollectorImpl.send(
> >>>>> RecordCollectorImpl.java:76)
> >>>>>   ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >>>>>       at
> >>>>>   org.apache.kafka.streams.processor.internals.
> >>> RecordCollectorImpl.send(
> >>>>> RecordCollectorImpl.java:64)
> >>>>>   ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
> >>>>>
> >>>>>
> >>>>>   Again it is not clear why in this case we need to shut down the
> >>> steams
> >>>>>   thread and eventually the application. Shouldn't we capture this
> >>> error
> >>>>> too?
> >>>>>
> >>>>>   Thanks
> >>>>>   Sachin
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>
> >>>
> >>
>
>

Re: Kafka streams 0.10.2 Producer throwing exception eventually causing streams shutdown

Posted by Eno Thereska <en...@gmail.com>.
Hi Sachin,

In the bug fix release for 0.10.2 (and in trunk) we have now set max.poll.interval to infinite since from our experience with streams this should not be something that users set: https://github.com/apache/kafka/pull/2770/files <https://github.com/apache/kafka/pull/2770/files>.

We're in the process of documenting that change. For now you can increase the request timeout without worrying about max.poll.interval anymore. In fact I'd suggest you also increase max.poll.interval as we've done it above.

Thanks
Eno

> On 1 Apr 2017, at 03:28, Sachin Mittal <sj...@gmail.com> wrote:
> 
> Should this timeout be less than max poll interval value? if yes than
> generally speaking what should be the ratio between two or range for this
> timeout value .
> 
> Thanks
> Sachin
> 
> On 1 Apr 2017 04:57, "Matthias J. Sax" <ma...@confluent.io> wrote:
> 
> Yes, you can increase ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG
> 
> 
> -Matthias
> 
> 
> On 3/31/17 11:32 AM, Sachin Mittal wrote:
>> Hi,
>> So I have added the config ProducerConfig.RETRIES_CONFIG,
> Integer.MAX_VALUE
>> and the NotLeaderForPartitionException is gone.
>> 
>> However we see a new exception especially under heavy load:
>> org.apache.kafka.streams.errors.StreamsException: task [0_1] exception
>> caught when producing
>>  at
>> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.
> checkForException(RecordCollectorImpl.java:119)
>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>  at
>> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(
> RecordCollectorImpl.java:127)
>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]        at
>> org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.
> java:76)
>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>  at
>> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
> measureLatencyNs(StreamsMetricsImpl.java:188)
>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>  at
>> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.
> java:280)
>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]        at
>> org.apache.kafka.streams.processor.internals.StreamThread.commitOne(
> StreamThread.java:787)
>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>  at
>> org.apache.kafka.streams.processor.internals.StreamThread.commitAll(
> StreamThread.java:774)
>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]        at
>> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(
> StreamThread.java:749)
>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>  at
>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> StreamThread.java:671)
>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]        at
>> org.apache.kafka.streams.processor.internals.
> StreamThread.run(StreamThread.java:378)
>> ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>> org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for
>> new-part-advice-key-table-changelog-1: 30001 ms has passed since last
> append
>> 
>> So any idea as why TimeoutException is happening.
>> Is this controlled by
>> ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG
>> 
>> If yes
>> What should the value be set in this given that out consumer
>> max.poll.interval.ms is defaul 5 minutes.
>> 
>> Is there any other setting that we should try to avoid such errors which
>> causes stream thread to die.
>> 
>> Thanks
>> Sachin
>> 
>> 
>> On Sun, Mar 26, 2017 at 1:39 AM, Eno Thereska <en...@gmail.com>
>> wrote:
>> 
>>> Hi Sachin,
>>> 
>>> Not in this case.
>>> 
>>> Thanks
>>> Eno
>>> 
>>>> On Mar 25, 2017, at 6:19 PM, Sachin Mittal <sj...@gmail.com> wrote:
>>>> 
>>>> OK.
>>>> I will try this out.
>>>> 
>>>> Do I need to change anything for
>>>> max.in.flight.requests.per.connection
>>>> 
>>>> Thanks
>>>> Sachin
>>>> 
>>>> 
>>>> On Sat, Mar 25, 2017 at 10:59 PM, Eno Thereska <en...@gmail.com>
>>>> wrote:
>>>> 
>>>>> Hi Sachin,
>>>>> 
>>>>> For this particular error, “org.apache.kafka.common.errors.
>>>>> NotLeaderForPartitionException: This server is not the leader for that
>>>>> topic-partition.”, could you try setting the number of retries to
>>> something
>>>>> large like this:
>>>>> 
>>>>> Properties props = new Properties();
>>>>> props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
>>>>> ...
>>>>> props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
>>>>> 
>>>>> This will retry the produce requests and should hopefully solve your
>>>>> immediate problem.
>>>>> 
>>>>> Thanks
>>>>> Eno
>>>>> 
>>>>> 
>>>>> On 25/03/2017, 08:35, "Sachin Mittal" <sj...@gmail.com> wrote:
>>>>> 
>>>>>   Hi,
>>>>>   We have encountered another case of series of errors which I would
>>> need
>>>>>   more help in understanding.
>>>>> 
>>>>>   In logs we see message like this:
>>>>>   ERROR 2017-03-25 03:41:40,001 [kafka-producer-network-thread |
>>>>>   85-StreamThread-3-producer]:
>>>>>   org.apache.kafka.streams.processor.internals.RecordCollectorImpl -
>>>>> task
>>>>>   [0_1] Error sending record to topic new-part-advice-key-table-
>>> changelog.
>>>>> No
>>>>>   more offsets will be recorded for this task and the exception will
>>>>>   eventually be thrown
>>>>> 
>>>>>   then some millisecond later
>>>>>   ERROR 2017-03-25 03:41:40,149 [StreamThread-3]:
>>>>>   org.apache.kafka.streams.processor.internals.StreamThread -
>>>>> stream-thread
>>>>>   [StreamThread-3] Failed while executing StreamTask 0_1 due to flush
>>>>> state:
>>>>>   org.apache.kafka.streams.errors.StreamsException: task [0_1]
>>> exception
>>>>>   caught when producing
>>>>>       at
>>>>>   org.apache.kafka.streams.processor.internals.RecordCollectorImpl.
>>>>> checkForException(RecordCollectorImpl.java:119)
>>>>>   ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>>>>       at
>>>>>   org.apache.kafka.streams.processor.internals.
>>>>> RecordCollectorImpl.flush(RecordCollectorImpl.java:127)
>>>>>   ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>>>>       at
>>>>>   org.apache.kafka.streams.processor.internals.StreamTask.flushState(
>>>>> StreamTask.java:422)
>>>>>   ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>>>>       at
>>>>>   org.apache.kafka.streams.processor.internals.StreamThread$4.apply(
>>>>> StreamThread.java:555)
>>>>>   ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>>>>       at
>>>>>   org.apache.kafka.streams.processor.internals.StreamThread.
>>>>> performOnAllTasks(StreamThread.java:513)
>>>>>   [kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>>>>       at
>>>>>   org.apache.kafka.streams.processor.internals.
>>>>> StreamThread.flushAllState(StreamThread.java:551)
>>>>>   [kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>>>>       at
>>>>>   org.apache.kafka.streams.processor.internals.StreamThread.
>>>>> shutdownTasksAndState(StreamThread.java:463)
>>>>>   [kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>>>>       at
>>>>>   org.apache.kafka.streams.processor.internals.StreamThread.shutdown(
>>>>> StreamThread.java:408)
>>>>>   [kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>>>>       at
>>>>>   org.apache.kafka.streams.processor.internals.
>>>>> StreamThread.run(StreamThread.java:389)
>>>>>   [kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>>>>   org.apache.kafka.common.errors.NotLeaderForPartitionException: This
>>>>> server
>>>>>   is not the leader for that topic-partition.
>>>>> 
>>>>>   finally we get this
>>>>>   ERROR 2017-03-25 03:41:45,724 [StreamThread-3]:
>>>>> com.advice.TestKafkaAdvice
>>>>>   - Uncaught exception:
>>>>>   org.apache.kafka.streams.errors.StreamsException: Exception caught
>>> in
>>>>>   process. taskId=0_1, processor=KSTREAM-SOURCE-0000000000,
>>>>>   topic=advice-stream, partition=1, offset=48062286
>>>>>       at
>>>>>   org.apache.kafka.streams.processor.internals.
>>>>> StreamTask.process(StreamTask.java:216)
>>>>>   ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>>>>       at
>>>>>   org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
>>>>> StreamThread.java:651)
>>>>>   ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>>>>       at
>>>>>   org.apache.kafka.streams.processor.internals.
>>>>> StreamThread.run(StreamThread.java:378)
>>>>>   ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>>>>   Caused by: org.apache.kafka.streams.errors.StreamsException: task
>>>>> [0_1]
>>>>>   exception caught when producing
>>>>>       at
>>>>>   org.apache.kafka.streams.processor.internals.RecordCollectorImpl.
>>>>> checkForException(RecordCollectorImpl.java:119)
>>>>>   ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>>>>       at
>>>>>   org.apache.kafka.streams.processor.internals.
>>> RecordCollectorImpl.send(
>>>>> RecordCollectorImpl.java:76)
>>>>>   ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>>>>       at
>>>>>   org.apache.kafka.streams.processor.internals.
>>> RecordCollectorImpl.send(
>>>>> RecordCollectorImpl.java:64)
>>>>>   ~[kafka-streams-0.10.2.1-SNAPSHOT.jar:na]
>>>>> 
>>>>> 
>>>>>   Again it is not clear why in this case we need to shut down the
>>> steams
>>>>>   thread and eventually the application. Shouldn't we capture this
>>> error
>>>>> too?
>>>>> 
>>>>>   Thanks
>>>>>   Sachin
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>> 
>>> 
>>