You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Sameer Kumar <sa...@gmail.com> on 2017/09/06 06:58:13 UTC

Kafka 11 | Stream Application crashed the brokers

Hi All,

I want to report a scenario wherein my running 2 different instances of my
stream application caused my brokers to crash and eventually my stream
application as well. This scenario only happens when my brokers run on
Kafka11, everything works fine if my brokers are on Kafka 10..2 and stream
application on Kafka11.

I am attaching herewith the logs in a zipped format.

The cluster configuration
3 nodes(190,192,193) , Kafka 11
Topic Replication Factor - 2

App configuration
Kafka 11 streams.


The error I saw on 193 server
was org.apache.kafka.common.errors.NotEnoughReplicasException: Number of
insync replicas for partition __transaction_state-18 is [1], below required
minimum [3]. Both 192,190 servers reported errors on failure to read
information from 193.

Please look for the time around 12:30-12:32 to find the relevant logs. Let
me know if you need some other information.


Regards,
-Sameer.

Re: Kafka 11 | Stream Application crashed the brokers

Posted by Michael Noll <mi...@confluent.io>.
Thanks for reporting back, Sameer!


On Fri, Dec 1, 2017 at 2:46 AM, Guozhang Wang <wa...@gmail.com> wrote:

> Thanks for confirming Sameer.
>
>
> Guozhang
>
> On Thu, Nov 30, 2017 at 3:52 AM, Sameer Kumar <sa...@gmail.com>
> wrote:
>
> > Just wanted to let everyone know that this issue got fixed in Kafka
> 1.0.0.
> > I recently migrated to it and didnt find the issue any longer.
> >
> > -Sameer.
> >
> > On Thu, Sep 14, 2017 at 5:50 PM, Sameer Kumar <sa...@gmail.com>
> > wrote:
> >
> > > ;Ok. I will inspect this further and keep everyone posted on this.
> > >
> > > -Sameer.
> > >
> > > On Thu, Sep 14, 2017 at 1:46 AM, Guozhang Wang <wa...@gmail.com>
> > wrote:
> > >
> > >> When exactly_once is turned on the transactional id would be set
> > >> automatically by the Streams client.
> > >>
> > >> What I'd inspect is the healthiness of the brokers since the "
> > >> *TimeoutException*", if you have metrics on the broker servers
> regarding
> > >> request handler thread idleness / request queue length / request rate
> > etc,
> > >> you can monitor that and see what could be the possible causes of the
> > >> broker unavailability.
> > >>
> > >>
> > >> Guozhang
> > >>
> > >>
> > >> On Wed, Sep 13, 2017 at 8:26 AM, Sameer Kumar <sam.kum.work@gmail.com
> >
> > >> wrote:
> > >>
> > >> > Adding more info:-
> > >> >
> > >> > Hi Guozhang,
> > >> >
> > >> > I was using exactly_once processing here, I can see this in the
> client
> > >> > logs, however I am not setting transaction id though.
> > >> >
> > >> > application.id = c-7-e6
> > >> > application.server =
> > >> > bootstrap.servers = [172.29.65.190:9092, 172.29.65.192:9092,
> > >> > 172.29.65.193:9092]
> > >> > buffered.records.per.partition = 10000
> > >> > cache.max.bytes.buffering = 2097152000
> > >> > client.id =
> > >> > commit.interval.ms = 5000
> > >> > connections.max.idle.ms = 540000
> > >> > default.key.serde = class
> > >> > org.apache.kafka.common.serialization.Serdes$ByteArraySerde
> > >> > default.timestamp.extractor = class
> > >> > org.apache.kafka.streams.processor.FailOnInvalidTimestamp
> > >> > default.value.serde = class
> > >> > org.apache.kafka.common.serialization.Serdes$ByteArraySerde
> > >> > key.serde = class org.apache.kafka.common.serial
> > >> ization.Serdes$StringSerde
> > >> > metadata.max.age.ms = 60000
> > >> > metric.reporters = []
> > >> > metrics.num.samples = 2
> > >> > metrics.recording.level = INFO
> > >> > metrics.sample.window.ms = 30000
> > >> > num.standby.replicas = 0
> > >> > num.stream.threads = 15
> > >> > partition.grouper = class
> > >> > org.apache.kafka.streams.processor.DefaultPartitionGrouper
> > >> > poll.ms = 100
> > >> > processing.guarantee = exactly_once
> > >> > receive.buffer.bytes = 32768
> > >> > reconnect.backoff.max.ms = 1000
> > >> > reconnect.backoff.ms = 50
> > >> > replication.factor = 1
> > >> > request.timeout.ms = 40000
> > >> > retry.backoff.ms = 100
> > >> > rocksdb.config.setter = null
> > >> > security.protocol = PLAINTEXT
> > >> > send.buffer.bytes = 131072
> > >> > state.cleanup.delay.ms = 4611686018427386903
> > >> > state.dir = /data/streampoc/
> > >> > timestamp.extractor = class
> > >> > org.apache.kafka.streams.processor.WallclockTimestampExtractor
> > >> > value.serde = class org.apache.kafka.common.serialization.Serdes$
> > >> > StringSerde
> > >> > windowstore.changelog.additional.retention.ms = 86400000
> > >> > zookeeper.connect =
> > >> >
> > >> >
> > >> > On Wed, Sep 13, 2017 at 12:16 PM, Sameer Kumar <
> > sam.kum.work@gmail.com>
> > >> > wrote:
> > >> >
> > >> > > Hi Guozhang,
> > >> > >
> > >> > > The producer sending data to this topic is not running
> concurrently
> > >> with
> > >> > > the stream processing. I had first ingested the data from another
> > >> cluster
> > >> > > and then have the stream processing ran on it. The producer code
> is
> > >> > written
> > >> > > by me and it doesnt have transactions on by default.
> > >> > >
> > >> > > I will double check if someone else has transaction turned on, but
> > >> this
> > >> > is
> > >> > > quite unlikely. Is there someway to verify it through logs.
> > >> > >
> > >> > > All of this behavior works fine when brokers are run on Kafka 10,
> > this
> > >> > > might be because transactions are only available on Kafka11. I am
> > >> > > suspecting would there be a case that too much processing is
> causing
> > >> one
> > >> > of
> > >> > > the brokers to crash. The timeouts are indicating that it is
> taking
> > >> time
> > >> > to
> > >> > > send data
> > >> > >
> > >> > > I have tried this behavior also on a another cluster which I
> > >> exclusively
> > >> > > use it for myself and found the same behavior there as well.
> > >> > >
> > >> > > What do you think should be our next step so that we can get to
> the
> > >> root
> > >> > > of the issue.
> > >> > >
> > >> > > -Sameer.
> > >> > >
> > >> > > On Wed, Sep 13, 2017 at 6:14 AM, Guozhang Wang <
> wangguoz@gmail.com>
> > >> > wrote:
> > >> > >
> > >> > >> Hi Sameer,
> > >> > >>
> > >> > >> If no clients has transactions turned on the
> `__transaction_state`
> > >> > >> internal
> > >> > >> topic would not be created at all. So I still suspect that some
> of
> > >> your
> > >> > >> clients (maybe not your Streams client, but your Producer client
> > >> that is
> > >> > >> sending data to the source topic?) has transactions turned on.
> > >> > >>
> > >> > >> BTW from your logs I saw lots of the following errors on client
> > side:
> > >> > >>
> > >> > >> 2017-09-11 12:42:34 ERROR RecordCollectorImpl:113 - task [0_6]
> > Error
> > >> > >> sending record to topic c-7-e6-KSTREAM-BRANCHCHILD-000
> > >> > >> 0000007-repartition.
> > >> > >> No more offsets will be recorded for this task and the exception
> > will
> > >> > >> eventually be thrown
> > >> > >>
> > >> > >> org.apache.kafka.common.errors.*TimeoutException*: Expiring 13
> > >> > record(s)
> > >> > >> for c-7-e6-KSTREAM-BRANCHCHILD-0000000007-repartition-3: 31174
> ms
> > >> has
> > >> > >> passed since last append
> > >> > >>
> > >> > >> 2017-09-11 12:42:36 WARN  Sender:511 - Got error produce response
> > >> with
> > >> > >> correlation id 82862 on topic-partition
> > >> > >> c-7-e6-KSTREAM-JOINTHIS-0000000018-store-changelog-22, retrying
> > >> > >> (2147483646
> > >> > >> attempts left). *Error: NETWORK_EXCEPTION*
> > >> > >>
> > >> > >> 2017-09-11 12:42:36 ERROR RecordCollectorImpl:113 - task [0_22]
> > Error
> > >> > >> sending record to topic c-7-e6-KSTREAM-BRANCHCHILD-000
> > >> > >> 0000007-repartition.
> > >> > >> No more offsets will be recorded for this task and the exception
> > will
> > >> > >> eventually be thrown
> > >> > >>
> > >> > >> org.apache.kafka.common.errors.*TimeoutException*: Expiring 13
> > >> > record(s)
> > >> > >> for c-7-e6-KSTREAM-BRANCHCHILD-0000000007-repartition-3: 31467
> ms
> > >> has
> > >> > >> passed since last append
> > >> > >>
> > >> > >>
> > >> > >> Today if the TimeoutException is thrown from the recordCollector
> it
> > >> will
> > >> > >> cause the Streams to throw this exception all the way to the user
> > >> > >> exception
> > >> > >> handler and then shutdown the thread. And this exception would be
> > >> thrown
> > >> > >> if
> > >> > >> the Kafka broker itself is not available (also from your previous
> > >> logs
> > >> > it
> > >> > >> seems broker 192 and 193 was unavailable and hence being kicked
> out
> > >> by
> > >> > >> broker 109 out of the IRS).
> > >> > >>
> > >> > >>
> > >> > >> Guozhang
> > >> > >>
> > >> > >>
> > >> > >>
> > >> > >> On Mon, Sep 11, 2017 at 3:40 AM, Sameer Kumar <
> > >> sam.kum.work@gmail.com>
> > >> > >> wrote:
> > >> > >>
> > >> > >> > Hi Guozhang,
> > >> > >> >
> > >> > >> > Please find the relevant logs, see a folder for client logs as
> > >> well,
> > >> > >> > things started getting awry at 12:42:05.
> > >> > >> > Let me know if you need any more information.
> > >> > >> >
> > >> > >> > -Sameer.
> > >> > >> >
> > >> > >> > On Sun, Sep 10, 2017 at 5:06 PM, Sameer Kumar <
> > >> sam.kum.work@gmail.com
> > >> > >
> > >> > >> > wrote:
> > >> > >> >
> > >> > >> >> Hi Guozhang,
> > >> > >> >>
> > >> > >> >> Nope, I was not using exactly-once mode. I dont have the
> client
> > >> logs
> > >> > >> with
> > >> > >> >> me right now, I will try to replicate it again and share the
> > other
> > >> > >> details
> > >> > >> >> with you.
> > >> > >> >>
> > >> > >> >> My concern was that it crashed my brokers as well.
> > >> > >> >>
> > >> > >> >> -Sameer.
> > >> > >> >>
> > >> > >> >> On Sat, Sep 9, 2017 at 1:51 AM, Guozhang Wang <
> > wangguoz@gmail.com
> > >> >
> > >> > >> wrote:
> > >> > >> >>
> > >> > >> >>> Hello Sameer,
> > >> > >> >>>
> > >> > >> >>> I looked through your code, and here is what I figured: in
> 0.11
> > >> > >> version
> > >> > >> >>> we
> > >> > >> >>> added the exactly-once feature (
> > >> > >> >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+E
> > >> > >> >>> xactly+Once+Delivery+and+Transactional+Messaging
> > >> > >> >>> )
> > >> > >> >>>
> > >> > >> >>> Which uses the transaction log (internal topic named
> > >> > >> >>> "__transaction_state")
> > >> > >> >>> that has a default replication of 3 (that will overwrite your
> > >> global
> > >> > >> >>> config
> > >> > >> >>> value of 2). Then at around 12:30, the leader of the
> transation
> > >> log
> > >> > >> >>> partition kicked both replicas of 190 and 192 out of the
> > replica:
> > >> > >> >>>
> > >> > >> >>> [2017-09-05 12:30:31,256] INFO [GroupCoordinator 193]:
> > Preparing
> > >> to
> > >> > >> >>> rebalance group KafkaCache_TEST15 with old generation 14
> > >> > >> >>> (__consumer_offsets-27) (kafka.coordinator.group.Group
> > >> Coordinator)
> > >> > >> >>>
> > >> > >> >>> *[2017-09-05 12:30:41,510] INFO Partition
> > >> [__transaction_state,9] on
> > >> > >> >>> broker
> > >> > >> >>> 193: Shrinking ISR from 193,192,190 to 193
> > >> > (kafka.cluster.Partition)*
> > >> > >> >>>
> > >> > >> >>> *[2017-09-05 12:30:41,513] INFO Partition
> > >> [__transaction_state,6] on
> > >> > >> >>> broker
> > >> > >> >>> 193: Shrinking ISR from 193,190,192 to 193
> > >> > (kafka.cluster.Partition)*
> > >> > >> >>>
> > >> > >> >>> *[2017-09-05 12:30:41,514] INFO Partition
> > >> [__transaction_state,3] on
> > >> > >> >>> broker
> > >> > >> >>> 193: Shrinking ISR from 193,192,190 to 193
> > >> > (kafka.cluster.Partition)*
> > >> > >> >>>
> > >> > >> >>> *[2017-09-05 12:30:41,515] INFO Partition
> > >> [__transaction_state,18]
> > >> > on
> > >> > >> >>> broker 193: Shrinking ISR from 193,190,192 to 193
> > >> > >> >>> (kafka.cluster.Partition)*
> > >> > >> >>>
> > >> > >> >>> *[2017-09-05 12:30:41,515] INFO Partition
> > >> [__transaction_state,15]
> > >> > on
> > >> > >> >>> broker 193: Shrinking ISR from 193,192,190 to 193
> > >> > >> >>> (kafka.cluster.Partition)*
> > >> > >> >>>
> > >> > >> >>> *[2017-09-05 12:30:41,516] INFO Partition
> > >> [__transaction_state,12]
> > >> > on
> > >> > >> >>> broker 193: Shrinking ISR from 193,190,192 to 193
> > >> > >> >>> (kafka.cluster.Partition)*
> > >> > >> >>>
> > >> > >> >>> *[2017-09-05 12:30:41,516] INFO Partition
> > >> [__consumer_offsets,12] on
> > >> > >> >>> broker
> > >> > >> >>> 193: Shrinking ISR from 193,192,190 to 193
> > >> > (kafka.cluster.Partition)*
> > >> > >> >>>
> > >> > >> >>> *[2017-09-05 12:30:41,517] INFO Partition
> > >> [__consumer_offsets,15] on
> > >> > >> >>> broker
> > >> > >> >>> 193: Shrinking ISR from 193,192,190 to 193
> > >> > (kafka.cluster.Partition)*
> > >> > >> >>>
> > >> > >> >>> *[2017-09-05 12:30:41,517] INFO Partition
> > >> [__transaction_state,24]
> > >> > on
> > >> > >> >>> broker 193: Shrinking ISR from 193,190,192 to 193
> > >> > >> >>> (kafka.cluster.Partition)*
> > >> > >> >>>
> > >> > >> >>> At the mean time, both replicas of 190 and 192 seems to be
> > timed
> > >> out
> > >> > >> on
> > >> > >> >>> their fetch requests (note the big timestamp gap in the
> logs):
> > >> > >> >>>
> > >> > >> >>> [2017-09-05 12:26:21,130] INFO Rolled new log segment for
> > >> > 'AdServe-4'
> > >> > >> in
> > >> > >> >>> 1
> > >> > >> >>> ms. (kafka.log.Log)
> > >> > >> >>> [2017-09-05 12:30:59,046] WARN [ReplicaFetcherThread-2-193]:
> > >> Error
> > >> > in
> > >> > >> >>> fetch
> > >> > >> >>> to broker 193, request (type=FetchRequest, replicaId=190,
> > >> > maxWait=500,
> > >> > >> >>> minBytes=1, maxBytes=10485760, fetchData={__consumer_offsets-
> > >> > >> >>> 21=(offset=0,
> > >> > >> >>> logStartOffset=0, maxBytes=1048576)
> > >> > >> >>>
> > >> > >> >>> ...
> > >> > >> >>>
> > >> > >> >>> [2017-09-05 12:28:37,514] INFO Deleting index
> > >> > >> >>> /data1/kafka/AdServe-5/00000000000405000294.
> timeindex.deleted
> > >> > >> >>> (kafka.log.TimeIndex)
> > >> > >> >>> [2017-09-05 12:30:59,042] WARN [ReplicaFetcherThread-2-193]:
> > >> Error
> > >> > in
> > >> > >> >>> fetch
> > >> > >> >>> to broker 193, request (type=FetchRequest, replicaId=192,
> > >> > maxWait=500,
> > >> > >> >>> minBytes=1, maxBytes=10485760, fetchData={__consumer_offsets-
> > >> > >> >>> 21=(offset=0,
> > >> > >> >>> logStartOffset=0, maxBytes=1048576)
> > >> > >> >>>
> > >> > >> >>>
> > >> > >> >>>
> > >> > >> >>> This caused the NotEnoughReplicasException since any appends
> to
> > >> the
> > >> > >> >>> transaction logs are required "acks=all, and
> > >> min.isr=num.replicas".
> > >> > >> >>>
> > >> > >> >>> *[2017-09-05 12:32:11,612] ERROR [Replica Manager on Broker
> > 193]:
> > >> > >> Error
> > >> > >> >>> processing append operation on partition
> __transaction_state-18
> > >> > >> >>> (kafka.server.ReplicaManager)*
> > >> > >> >>>
> > >> > >> >>> *org.apache.kafka.common.errors.NotEnoughReplicasException:
> > >> Number
> > >> > of
> > >> > >> >>> insync replicas for partition __transaction_state-18 is [1],
> > >> below
> > >> > >> >>> required
> > >> > >> >>> minimum [3]*
> > >> > >> >>>
> > >> > >> >>> Upon seeing this error, the transaction coordinator should
> > retry
> > >> > >> >>> appending,
> > >> > >> >>> but if the retry never succeeds it will be blocked. I did not
> > see
> > >> > the
> > >> > >> >>> Streams API client-side logs and so cannot tell for sure, why
> > >> this
> > >> > >> caused
> > >> > >> >>> the Streams app to fail as well. A quick question: did you
> > enable
> > >> > >> >>> `processing.mode=exactly-once` on your streams app?
> > >> > >> >>>
> > >> > >> >>>
> > >> > >> >>> Guozhang
> > >> > >> >>>
> > >> > >> >>>
> > >> > >> >>>
> > >> > >> >>>
> > >> > >> >>> On Fri, Sep 8, 2017 at 1:34 AM, Sameer Kumar <
> > >> > sam.kum.work@gmail.com>
> > >> > >> >>> wrote:
> > >> > >> >>>
> > >> > >> >>> > Hi All,
> > >> > >> >>> >
> > >> > >> >>> >
> > >> > >> >>> > Any thoughts on the below mail.
> > >> > >> >>> >
> > >> > >> >>> > -Sameer.
> > >> > >> >>> >
> > >> > >> >>> > On Wed, Sep 6, 2017 at 12:28 PM, Sameer Kumar <
> > >> > >> sam.kum.work@gmail.com>
> > >> > >> >>> > wrote:
> > >> > >> >>> >
> > >> > >> >>> > > Hi All,
> > >> > >> >>> > >
> > >> > >> >>> > > I want to report a scenario wherein my running 2
> different
> > >> > >> instances
> > >> > >> >>> of
> > >> > >> >>> > my
> > >> > >> >>> > > stream application caused my brokers to crash and
> > eventually
> > >> my
> > >> > >> >>> stream
> > >> > >> >>> > > application as well. This scenario only happens when my
> > >> brokers
> > >> > >> run
> > >> > >> >>> on
> > >> > >> >>> > > Kafka11, everything works fine if my brokers are on Kafka
> > >> 10..2
> > >> > >> and
> > >> > >> >>> > stream
> > >> > >> >>> > > application on Kafka11.
> > >> > >> >>> > >
> > >> > >> >>> > > I am attaching herewith the logs in a zipped format.
> > >> > >> >>> > >
> > >> > >> >>> > > The cluster configuration
> > >> > >> >>> > > 3 nodes(190,192,193) , Kafka 11
> > >> > >> >>> > > Topic Replication Factor - 2
> > >> > >> >>> > >
> > >> > >> >>> > > App configuration
> > >> > >> >>> > > Kafka 11 streams.
> > >> > >> >>> > >
> > >> > >> >>> > >
> > >> > >> >>> > > The error I saw on 193 server was
> org.apache.kafka.common.
> > >> > errors.
> > >> > >> >>> > NotEnoughReplicasException:
> > >> > >> >>> > > Number of insync replicas for partition
> > >> __transaction_state-18
> > >> > is
> > >> > >> >>> [1],
> > >> > >> >>> > > below required minimum [3]. Both 192,190 servers reported
> > >> errors
> > >> > >> on
> > >> > >> >>> > failure
> > >> > >> >>> > > to read information from 193.
> > >> > >> >>> > >
> > >> > >> >>> > > Please look for the time around 12:30-12:32 to find the
> > >> relevant
> > >> > >> >>> logs.
> > >> > >> >>> > Let
> > >> > >> >>> > > me know if you need some other information.
> > >> > >> >>> > >
> > >> > >> >>> > >
> > >> > >> >>> > > Regards,
> > >> > >> >>> > > -Sameer.
> > >> > >> >>> > >
> > >> > >> >>> >
> > >> > >> >>>
> > >> > >> >>>
> > >> > >> >>>
> > >> > >> >>> --
> > >> > >> >>> -- Guozhang
> > >> > >> >>>
> > >> > >> >>
> > >> > >> >>
> > >> > >> >
> > >> > >>
> > >> > >>
> > >> > >> --
> > >> > >> -- Guozhang
> > >> > >>
> > >> > >
> > >> > >
> > >> >
> > >>
> > >>
> > >>
> > >> --
> > >> -- Guozhang
> > >>
> > >
> > >
> >
>
>
>
> --
> -- Guozhang
>

Re: Kafka 11 | Stream Application crashed the brokers

Posted by Guozhang Wang <wa...@gmail.com>.
Thanks for confirming Sameer.


Guozhang

On Thu, Nov 30, 2017 at 3:52 AM, Sameer Kumar <sa...@gmail.com>
wrote:

> Just wanted to let everyone know that this issue got fixed in Kafka 1.0.0.
> I recently migrated to it and didnt find the issue any longer.
>
> -Sameer.
>
> On Thu, Sep 14, 2017 at 5:50 PM, Sameer Kumar <sa...@gmail.com>
> wrote:
>
> > ;Ok. I will inspect this further and keep everyone posted on this.
> >
> > -Sameer.
> >
> > On Thu, Sep 14, 2017 at 1:46 AM, Guozhang Wang <wa...@gmail.com>
> wrote:
> >
> >> When exactly_once is turned on the transactional id would be set
> >> automatically by the Streams client.
> >>
> >> What I'd inspect is the healthiness of the brokers since the "
> >> *TimeoutException*", if you have metrics on the broker servers regarding
> >> request handler thread idleness / request queue length / request rate
> etc,
> >> you can monitor that and see what could be the possible causes of the
> >> broker unavailability.
> >>
> >>
> >> Guozhang
> >>
> >>
> >> On Wed, Sep 13, 2017 at 8:26 AM, Sameer Kumar <sa...@gmail.com>
> >> wrote:
> >>
> >> > Adding more info:-
> >> >
> >> > Hi Guozhang,
> >> >
> >> > I was using exactly_once processing here, I can see this in the client
> >> > logs, however I am not setting transaction id though.
> >> >
> >> > application.id = c-7-e6
> >> > application.server =
> >> > bootstrap.servers = [172.29.65.190:9092, 172.29.65.192:9092,
> >> > 172.29.65.193:9092]
> >> > buffered.records.per.partition = 10000
> >> > cache.max.bytes.buffering = 2097152000
> >> > client.id =
> >> > commit.interval.ms = 5000
> >> > connections.max.idle.ms = 540000
> >> > default.key.serde = class
> >> > org.apache.kafka.common.serialization.Serdes$ByteArraySerde
> >> > default.timestamp.extractor = class
> >> > org.apache.kafka.streams.processor.FailOnInvalidTimestamp
> >> > default.value.serde = class
> >> > org.apache.kafka.common.serialization.Serdes$ByteArraySerde
> >> > key.serde = class org.apache.kafka.common.serial
> >> ization.Serdes$StringSerde
> >> > metadata.max.age.ms = 60000
> >> > metric.reporters = []
> >> > metrics.num.samples = 2
> >> > metrics.recording.level = INFO
> >> > metrics.sample.window.ms = 30000
> >> > num.standby.replicas = 0
> >> > num.stream.threads = 15
> >> > partition.grouper = class
> >> > org.apache.kafka.streams.processor.DefaultPartitionGrouper
> >> > poll.ms = 100
> >> > processing.guarantee = exactly_once
> >> > receive.buffer.bytes = 32768
> >> > reconnect.backoff.max.ms = 1000
> >> > reconnect.backoff.ms = 50
> >> > replication.factor = 1
> >> > request.timeout.ms = 40000
> >> > retry.backoff.ms = 100
> >> > rocksdb.config.setter = null
> >> > security.protocol = PLAINTEXT
> >> > send.buffer.bytes = 131072
> >> > state.cleanup.delay.ms = 4611686018427386903
> >> > state.dir = /data/streampoc/
> >> > timestamp.extractor = class
> >> > org.apache.kafka.streams.processor.WallclockTimestampExtractor
> >> > value.serde = class org.apache.kafka.common.serialization.Serdes$
> >> > StringSerde
> >> > windowstore.changelog.additional.retention.ms = 86400000
> >> > zookeeper.connect =
> >> >
> >> >
> >> > On Wed, Sep 13, 2017 at 12:16 PM, Sameer Kumar <
> sam.kum.work@gmail.com>
> >> > wrote:
> >> >
> >> > > Hi Guozhang,
> >> > >
> >> > > The producer sending data to this topic is not running concurrently
> >> with
> >> > > the stream processing. I had first ingested the data from another
> >> cluster
> >> > > and then have the stream processing ran on it. The producer code is
> >> > written
> >> > > by me and it doesnt have transactions on by default.
> >> > >
> >> > > I will double check if someone else has transaction turned on, but
> >> this
> >> > is
> >> > > quite unlikely. Is there someway to verify it through logs.
> >> > >
> >> > > All of this behavior works fine when brokers are run on Kafka 10,
> this
> >> > > might be because transactions are only available on Kafka11. I am
> >> > > suspecting would there be a case that too much processing is causing
> >> one
> >> > of
> >> > > the brokers to crash. The timeouts are indicating that it is taking
> >> time
> >> > to
> >> > > send data
> >> > >
> >> > > I have tried this behavior also on a another cluster which I
> >> exclusively
> >> > > use it for myself and found the same behavior there as well.
> >> > >
> >> > > What do you think should be our next step so that we can get to the
> >> root
> >> > > of the issue.
> >> > >
> >> > > -Sameer.
> >> > >
> >> > > On Wed, Sep 13, 2017 at 6:14 AM, Guozhang Wang <wa...@gmail.com>
> >> > wrote:
> >> > >
> >> > >> Hi Sameer,
> >> > >>
> >> > >> If no clients has transactions turned on the `__transaction_state`
> >> > >> internal
> >> > >> topic would not be created at all. So I still suspect that some of
> >> your
> >> > >> clients (maybe not your Streams client, but your Producer client
> >> that is
> >> > >> sending data to the source topic?) has transactions turned on.
> >> > >>
> >> > >> BTW from your logs I saw lots of the following errors on client
> side:
> >> > >>
> >> > >> 2017-09-11 12:42:34 ERROR RecordCollectorImpl:113 - task [0_6]
> Error
> >> > >> sending record to topic c-7-e6-KSTREAM-BRANCHCHILD-000
> >> > >> 0000007-repartition.
> >> > >> No more offsets will be recorded for this task and the exception
> will
> >> > >> eventually be thrown
> >> > >>
> >> > >> org.apache.kafka.common.errors.*TimeoutException*: Expiring 13
> >> > record(s)
> >> > >> for c-7-e6-KSTREAM-BRANCHCHILD-0000000007-repartition-3: 31174 ms
> >> has
> >> > >> passed since last append
> >> > >>
> >> > >> 2017-09-11 12:42:36 WARN  Sender:511 - Got error produce response
> >> with
> >> > >> correlation id 82862 on topic-partition
> >> > >> c-7-e6-KSTREAM-JOINTHIS-0000000018-store-changelog-22, retrying
> >> > >> (2147483646
> >> > >> attempts left). *Error: NETWORK_EXCEPTION*
> >> > >>
> >> > >> 2017-09-11 12:42:36 ERROR RecordCollectorImpl:113 - task [0_22]
> Error
> >> > >> sending record to topic c-7-e6-KSTREAM-BRANCHCHILD-000
> >> > >> 0000007-repartition.
> >> > >> No more offsets will be recorded for this task and the exception
> will
> >> > >> eventually be thrown
> >> > >>
> >> > >> org.apache.kafka.common.errors.*TimeoutException*: Expiring 13
> >> > record(s)
> >> > >> for c-7-e6-KSTREAM-BRANCHCHILD-0000000007-repartition-3: 31467 ms
> >> has
> >> > >> passed since last append
> >> > >>
> >> > >>
> >> > >> Today if the TimeoutException is thrown from the recordCollector it
> >> will
> >> > >> cause the Streams to throw this exception all the way to the user
> >> > >> exception
> >> > >> handler and then shutdown the thread. And this exception would be
> >> thrown
> >> > >> if
> >> > >> the Kafka broker itself is not available (also from your previous
> >> logs
> >> > it
> >> > >> seems broker 192 and 193 was unavailable and hence being kicked out
> >> by
> >> > >> broker 109 out of the IRS).
> >> > >>
> >> > >>
> >> > >> Guozhang
> >> > >>
> >> > >>
> >> > >>
> >> > >> On Mon, Sep 11, 2017 at 3:40 AM, Sameer Kumar <
> >> sam.kum.work@gmail.com>
> >> > >> wrote:
> >> > >>
> >> > >> > Hi Guozhang,
> >> > >> >
> >> > >> > Please find the relevant logs, see a folder for client logs as
> >> well,
> >> > >> > things started getting awry at 12:42:05.
> >> > >> > Let me know if you need any more information.
> >> > >> >
> >> > >> > -Sameer.
> >> > >> >
> >> > >> > On Sun, Sep 10, 2017 at 5:06 PM, Sameer Kumar <
> >> sam.kum.work@gmail.com
> >> > >
> >> > >> > wrote:
> >> > >> >
> >> > >> >> Hi Guozhang,
> >> > >> >>
> >> > >> >> Nope, I was not using exactly-once mode. I dont have the client
> >> logs
> >> > >> with
> >> > >> >> me right now, I will try to replicate it again and share the
> other
> >> > >> details
> >> > >> >> with you.
> >> > >> >>
> >> > >> >> My concern was that it crashed my brokers as well.
> >> > >> >>
> >> > >> >> -Sameer.
> >> > >> >>
> >> > >> >> On Sat, Sep 9, 2017 at 1:51 AM, Guozhang Wang <
> wangguoz@gmail.com
> >> >
> >> > >> wrote:
> >> > >> >>
> >> > >> >>> Hello Sameer,
> >> > >> >>>
> >> > >> >>> I looked through your code, and here is what I figured: in 0.11
> >> > >> version
> >> > >> >>> we
> >> > >> >>> added the exactly-once feature (
> >> > >> >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+E
> >> > >> >>> xactly+Once+Delivery+and+Transactional+Messaging
> >> > >> >>> )
> >> > >> >>>
> >> > >> >>> Which uses the transaction log (internal topic named
> >> > >> >>> "__transaction_state")
> >> > >> >>> that has a default replication of 3 (that will overwrite your
> >> global
> >> > >> >>> config
> >> > >> >>> value of 2). Then at around 12:30, the leader of the transation
> >> log
> >> > >> >>> partition kicked both replicas of 190 and 192 out of the
> replica:
> >> > >> >>>
> >> > >> >>> [2017-09-05 12:30:31,256] INFO [GroupCoordinator 193]:
> Preparing
> >> to
> >> > >> >>> rebalance group KafkaCache_TEST15 with old generation 14
> >> > >> >>> (__consumer_offsets-27) (kafka.coordinator.group.Group
> >> Coordinator)
> >> > >> >>>
> >> > >> >>> *[2017-09-05 12:30:41,510] INFO Partition
> >> [__transaction_state,9] on
> >> > >> >>> broker
> >> > >> >>> 193: Shrinking ISR from 193,192,190 to 193
> >> > (kafka.cluster.Partition)*
> >> > >> >>>
> >> > >> >>> *[2017-09-05 12:30:41,513] INFO Partition
> >> [__transaction_state,6] on
> >> > >> >>> broker
> >> > >> >>> 193: Shrinking ISR from 193,190,192 to 193
> >> > (kafka.cluster.Partition)*
> >> > >> >>>
> >> > >> >>> *[2017-09-05 12:30:41,514] INFO Partition
> >> [__transaction_state,3] on
> >> > >> >>> broker
> >> > >> >>> 193: Shrinking ISR from 193,192,190 to 193
> >> > (kafka.cluster.Partition)*
> >> > >> >>>
> >> > >> >>> *[2017-09-05 12:30:41,515] INFO Partition
> >> [__transaction_state,18]
> >> > on
> >> > >> >>> broker 193: Shrinking ISR from 193,190,192 to 193
> >> > >> >>> (kafka.cluster.Partition)*
> >> > >> >>>
> >> > >> >>> *[2017-09-05 12:30:41,515] INFO Partition
> >> [__transaction_state,15]
> >> > on
> >> > >> >>> broker 193: Shrinking ISR from 193,192,190 to 193
> >> > >> >>> (kafka.cluster.Partition)*
> >> > >> >>>
> >> > >> >>> *[2017-09-05 12:30:41,516] INFO Partition
> >> [__transaction_state,12]
> >> > on
> >> > >> >>> broker 193: Shrinking ISR from 193,190,192 to 193
> >> > >> >>> (kafka.cluster.Partition)*
> >> > >> >>>
> >> > >> >>> *[2017-09-05 12:30:41,516] INFO Partition
> >> [__consumer_offsets,12] on
> >> > >> >>> broker
> >> > >> >>> 193: Shrinking ISR from 193,192,190 to 193
> >> > (kafka.cluster.Partition)*
> >> > >> >>>
> >> > >> >>> *[2017-09-05 12:30:41,517] INFO Partition
> >> [__consumer_offsets,15] on
> >> > >> >>> broker
> >> > >> >>> 193: Shrinking ISR from 193,192,190 to 193
> >> > (kafka.cluster.Partition)*
> >> > >> >>>
> >> > >> >>> *[2017-09-05 12:30:41,517] INFO Partition
> >> [__transaction_state,24]
> >> > on
> >> > >> >>> broker 193: Shrinking ISR from 193,190,192 to 193
> >> > >> >>> (kafka.cluster.Partition)*
> >> > >> >>>
> >> > >> >>> At the mean time, both replicas of 190 and 192 seems to be
> timed
> >> out
> >> > >> on
> >> > >> >>> their fetch requests (note the big timestamp gap in the logs):
> >> > >> >>>
> >> > >> >>> [2017-09-05 12:26:21,130] INFO Rolled new log segment for
> >> > 'AdServe-4'
> >> > >> in
> >> > >> >>> 1
> >> > >> >>> ms. (kafka.log.Log)
> >> > >> >>> [2017-09-05 12:30:59,046] WARN [ReplicaFetcherThread-2-193]:
> >> Error
> >> > in
> >> > >> >>> fetch
> >> > >> >>> to broker 193, request (type=FetchRequest, replicaId=190,
> >> > maxWait=500,
> >> > >> >>> minBytes=1, maxBytes=10485760, fetchData={__consumer_offsets-
> >> > >> >>> 21=(offset=0,
> >> > >> >>> logStartOffset=0, maxBytes=1048576)
> >> > >> >>>
> >> > >> >>> ...
> >> > >> >>>
> >> > >> >>> [2017-09-05 12:28:37,514] INFO Deleting index
> >> > >> >>> /data1/kafka/AdServe-5/00000000000405000294.timeindex.deleted
> >> > >> >>> (kafka.log.TimeIndex)
> >> > >> >>> [2017-09-05 12:30:59,042] WARN [ReplicaFetcherThread-2-193]:
> >> Error
> >> > in
> >> > >> >>> fetch
> >> > >> >>> to broker 193, request (type=FetchRequest, replicaId=192,
> >> > maxWait=500,
> >> > >> >>> minBytes=1, maxBytes=10485760, fetchData={__consumer_offsets-
> >> > >> >>> 21=(offset=0,
> >> > >> >>> logStartOffset=0, maxBytes=1048576)
> >> > >> >>>
> >> > >> >>>
> >> > >> >>>
> >> > >> >>> This caused the NotEnoughReplicasException since any appends to
> >> the
> >> > >> >>> transaction logs are required "acks=all, and
> >> min.isr=num.replicas".
> >> > >> >>>
> >> > >> >>> *[2017-09-05 12:32:11,612] ERROR [Replica Manager on Broker
> 193]:
> >> > >> Error
> >> > >> >>> processing append operation on partition __transaction_state-18
> >> > >> >>> (kafka.server.ReplicaManager)*
> >> > >> >>>
> >> > >> >>> *org.apache.kafka.common.errors.NotEnoughReplicasException:
> >> Number
> >> > of
> >> > >> >>> insync replicas for partition __transaction_state-18 is [1],
> >> below
> >> > >> >>> required
> >> > >> >>> minimum [3]*
> >> > >> >>>
> >> > >> >>> Upon seeing this error, the transaction coordinator should
> retry
> >> > >> >>> appending,
> >> > >> >>> but if the retry never succeeds it will be blocked. I did not
> see
> >> > the
> >> > >> >>> Streams API client-side logs and so cannot tell for sure, why
> >> this
> >> > >> caused
> >> > >> >>> the Streams app to fail as well. A quick question: did you
> enable
> >> > >> >>> `processing.mode=exactly-once` on your streams app?
> >> > >> >>>
> >> > >> >>>
> >> > >> >>> Guozhang
> >> > >> >>>
> >> > >> >>>
> >> > >> >>>
> >> > >> >>>
> >> > >> >>> On Fri, Sep 8, 2017 at 1:34 AM, Sameer Kumar <
> >> > sam.kum.work@gmail.com>
> >> > >> >>> wrote:
> >> > >> >>>
> >> > >> >>> > Hi All,
> >> > >> >>> >
> >> > >> >>> >
> >> > >> >>> > Any thoughts on the below mail.
> >> > >> >>> >
> >> > >> >>> > -Sameer.
> >> > >> >>> >
> >> > >> >>> > On Wed, Sep 6, 2017 at 12:28 PM, Sameer Kumar <
> >> > >> sam.kum.work@gmail.com>
> >> > >> >>> > wrote:
> >> > >> >>> >
> >> > >> >>> > > Hi All,
> >> > >> >>> > >
> >> > >> >>> > > I want to report a scenario wherein my running 2 different
> >> > >> instances
> >> > >> >>> of
> >> > >> >>> > my
> >> > >> >>> > > stream application caused my brokers to crash and
> eventually
> >> my
> >> > >> >>> stream
> >> > >> >>> > > application as well. This scenario only happens when my
> >> brokers
> >> > >> run
> >> > >> >>> on
> >> > >> >>> > > Kafka11, everything works fine if my brokers are on Kafka
> >> 10..2
> >> > >> and
> >> > >> >>> > stream
> >> > >> >>> > > application on Kafka11.
> >> > >> >>> > >
> >> > >> >>> > > I am attaching herewith the logs in a zipped format.
> >> > >> >>> > >
> >> > >> >>> > > The cluster configuration
> >> > >> >>> > > 3 nodes(190,192,193) , Kafka 11
> >> > >> >>> > > Topic Replication Factor - 2
> >> > >> >>> > >
> >> > >> >>> > > App configuration
> >> > >> >>> > > Kafka 11 streams.
> >> > >> >>> > >
> >> > >> >>> > >
> >> > >> >>> > > The error I saw on 193 server was org.apache.kafka.common.
> >> > errors.
> >> > >> >>> > NotEnoughReplicasException:
> >> > >> >>> > > Number of insync replicas for partition
> >> __transaction_state-18
> >> > is
> >> > >> >>> [1],
> >> > >> >>> > > below required minimum [3]. Both 192,190 servers reported
> >> errors
> >> > >> on
> >> > >> >>> > failure
> >> > >> >>> > > to read information from 193.
> >> > >> >>> > >
> >> > >> >>> > > Please look for the time around 12:30-12:32 to find the
> >> relevant
> >> > >> >>> logs.
> >> > >> >>> > Let
> >> > >> >>> > > me know if you need some other information.
> >> > >> >>> > >
> >> > >> >>> > >
> >> > >> >>> > > Regards,
> >> > >> >>> > > -Sameer.
> >> > >> >>> > >
> >> > >> >>> >
> >> > >> >>>
> >> > >> >>>
> >> > >> >>>
> >> > >> >>> --
> >> > >> >>> -- Guozhang
> >> > >> >>>
> >> > >> >>
> >> > >> >>
> >> > >> >
> >> > >>
> >> > >>
> >> > >> --
> >> > >> -- Guozhang
> >> > >>
> >> > >
> >> > >
> >> >
> >>
> >>
> >>
> >> --
> >> -- Guozhang
> >>
> >
> >
>



-- 
-- Guozhang

Re: Kafka 11 | Stream Application crashed the brokers

Posted by Sameer Kumar <sa...@gmail.com>.
Just wanted to let everyone know that this issue got fixed in Kafka 1.0.0.
I recently migrated to it and didnt find the issue any longer.

-Sameer.

On Thu, Sep 14, 2017 at 5:50 PM, Sameer Kumar <sa...@gmail.com>
wrote:

> ;Ok. I will inspect this further and keep everyone posted on this.
>
> -Sameer.
>
> On Thu, Sep 14, 2017 at 1:46 AM, Guozhang Wang <wa...@gmail.com> wrote:
>
>> When exactly_once is turned on the transactional id would be set
>> automatically by the Streams client.
>>
>> What I'd inspect is the healthiness of the brokers since the "
>> *TimeoutException*", if you have metrics on the broker servers regarding
>> request handler thread idleness / request queue length / request rate etc,
>> you can monitor that and see what could be the possible causes of the
>> broker unavailability.
>>
>>
>> Guozhang
>>
>>
>> On Wed, Sep 13, 2017 at 8:26 AM, Sameer Kumar <sa...@gmail.com>
>> wrote:
>>
>> > Adding more info:-
>> >
>> > Hi Guozhang,
>> >
>> > I was using exactly_once processing here, I can see this in the client
>> > logs, however I am not setting transaction id though.
>> >
>> > application.id = c-7-e6
>> > application.server =
>> > bootstrap.servers = [172.29.65.190:9092, 172.29.65.192:9092,
>> > 172.29.65.193:9092]
>> > buffered.records.per.partition = 10000
>> > cache.max.bytes.buffering = 2097152000
>> > client.id =
>> > commit.interval.ms = 5000
>> > connections.max.idle.ms = 540000
>> > default.key.serde = class
>> > org.apache.kafka.common.serialization.Serdes$ByteArraySerde
>> > default.timestamp.extractor = class
>> > org.apache.kafka.streams.processor.FailOnInvalidTimestamp
>> > default.value.serde = class
>> > org.apache.kafka.common.serialization.Serdes$ByteArraySerde
>> > key.serde = class org.apache.kafka.common.serial
>> ization.Serdes$StringSerde
>> > metadata.max.age.ms = 60000
>> > metric.reporters = []
>> > metrics.num.samples = 2
>> > metrics.recording.level = INFO
>> > metrics.sample.window.ms = 30000
>> > num.standby.replicas = 0
>> > num.stream.threads = 15
>> > partition.grouper = class
>> > org.apache.kafka.streams.processor.DefaultPartitionGrouper
>> > poll.ms = 100
>> > processing.guarantee = exactly_once
>> > receive.buffer.bytes = 32768
>> > reconnect.backoff.max.ms = 1000
>> > reconnect.backoff.ms = 50
>> > replication.factor = 1
>> > request.timeout.ms = 40000
>> > retry.backoff.ms = 100
>> > rocksdb.config.setter = null
>> > security.protocol = PLAINTEXT
>> > send.buffer.bytes = 131072
>> > state.cleanup.delay.ms = 4611686018427386903
>> > state.dir = /data/streampoc/
>> > timestamp.extractor = class
>> > org.apache.kafka.streams.processor.WallclockTimestampExtractor
>> > value.serde = class org.apache.kafka.common.serialization.Serdes$
>> > StringSerde
>> > windowstore.changelog.additional.retention.ms = 86400000
>> > zookeeper.connect =
>> >
>> >
>> > On Wed, Sep 13, 2017 at 12:16 PM, Sameer Kumar <sa...@gmail.com>
>> > wrote:
>> >
>> > > Hi Guozhang,
>> > >
>> > > The producer sending data to this topic is not running concurrently
>> with
>> > > the stream processing. I had first ingested the data from another
>> cluster
>> > > and then have the stream processing ran on it. The producer code is
>> > written
>> > > by me and it doesnt have transactions on by default.
>> > >
>> > > I will double check if someone else has transaction turned on, but
>> this
>> > is
>> > > quite unlikely. Is there someway to verify it through logs.
>> > >
>> > > All of this behavior works fine when brokers are run on Kafka 10, this
>> > > might be because transactions are only available on Kafka11. I am
>> > > suspecting would there be a case that too much processing is causing
>> one
>> > of
>> > > the brokers to crash. The timeouts are indicating that it is taking
>> time
>> > to
>> > > send data
>> > >
>> > > I have tried this behavior also on a another cluster which I
>> exclusively
>> > > use it for myself and found the same behavior there as well.
>> > >
>> > > What do you think should be our next step so that we can get to the
>> root
>> > > of the issue.
>> > >
>> > > -Sameer.
>> > >
>> > > On Wed, Sep 13, 2017 at 6:14 AM, Guozhang Wang <wa...@gmail.com>
>> > wrote:
>> > >
>> > >> Hi Sameer,
>> > >>
>> > >> If no clients has transactions turned on the `__transaction_state`
>> > >> internal
>> > >> topic would not be created at all. So I still suspect that some of
>> your
>> > >> clients (maybe not your Streams client, but your Producer client
>> that is
>> > >> sending data to the source topic?) has transactions turned on.
>> > >>
>> > >> BTW from your logs I saw lots of the following errors on client side:
>> > >>
>> > >> 2017-09-11 12:42:34 ERROR RecordCollectorImpl:113 - task [0_6] Error
>> > >> sending record to topic c-7-e6-KSTREAM-BRANCHCHILD-000
>> > >> 0000007-repartition.
>> > >> No more offsets will be recorded for this task and the exception will
>> > >> eventually be thrown
>> > >>
>> > >> org.apache.kafka.common.errors.*TimeoutException*: Expiring 13
>> > record(s)
>> > >> for c-7-e6-KSTREAM-BRANCHCHILD-0000000007-repartition-3: 31174 ms
>> has
>> > >> passed since last append
>> > >>
>> > >> 2017-09-11 12:42:36 WARN  Sender:511 - Got error produce response
>> with
>> > >> correlation id 82862 on topic-partition
>> > >> c-7-e6-KSTREAM-JOINTHIS-0000000018-store-changelog-22, retrying
>> > >> (2147483646
>> > >> attempts left). *Error: NETWORK_EXCEPTION*
>> > >>
>> > >> 2017-09-11 12:42:36 ERROR RecordCollectorImpl:113 - task [0_22] Error
>> > >> sending record to topic c-7-e6-KSTREAM-BRANCHCHILD-000
>> > >> 0000007-repartition.
>> > >> No more offsets will be recorded for this task and the exception will
>> > >> eventually be thrown
>> > >>
>> > >> org.apache.kafka.common.errors.*TimeoutException*: Expiring 13
>> > record(s)
>> > >> for c-7-e6-KSTREAM-BRANCHCHILD-0000000007-repartition-3: 31467 ms
>> has
>> > >> passed since last append
>> > >>
>> > >>
>> > >> Today if the TimeoutException is thrown from the recordCollector it
>> will
>> > >> cause the Streams to throw this exception all the way to the user
>> > >> exception
>> > >> handler and then shutdown the thread. And this exception would be
>> thrown
>> > >> if
>> > >> the Kafka broker itself is not available (also from your previous
>> logs
>> > it
>> > >> seems broker 192 and 193 was unavailable and hence being kicked out
>> by
>> > >> broker 109 out of the IRS).
>> > >>
>> > >>
>> > >> Guozhang
>> > >>
>> > >>
>> > >>
>> > >> On Mon, Sep 11, 2017 at 3:40 AM, Sameer Kumar <
>> sam.kum.work@gmail.com>
>> > >> wrote:
>> > >>
>> > >> > Hi Guozhang,
>> > >> >
>> > >> > Please find the relevant logs, see a folder for client logs as
>> well,
>> > >> > things started getting awry at 12:42:05.
>> > >> > Let me know if you need any more information.
>> > >> >
>> > >> > -Sameer.
>> > >> >
>> > >> > On Sun, Sep 10, 2017 at 5:06 PM, Sameer Kumar <
>> sam.kum.work@gmail.com
>> > >
>> > >> > wrote:
>> > >> >
>> > >> >> Hi Guozhang,
>> > >> >>
>> > >> >> Nope, I was not using exactly-once mode. I dont have the client
>> logs
>> > >> with
>> > >> >> me right now, I will try to replicate it again and share the other
>> > >> details
>> > >> >> with you.
>> > >> >>
>> > >> >> My concern was that it crashed my brokers as well.
>> > >> >>
>> > >> >> -Sameer.
>> > >> >>
>> > >> >> On Sat, Sep 9, 2017 at 1:51 AM, Guozhang Wang <wangguoz@gmail.com
>> >
>> > >> wrote:
>> > >> >>
>> > >> >>> Hello Sameer,
>> > >> >>>
>> > >> >>> I looked through your code, and here is what I figured: in 0.11
>> > >> version
>> > >> >>> we
>> > >> >>> added the exactly-once feature (
>> > >> >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+E
>> > >> >>> xactly+Once+Delivery+and+Transactional+Messaging
>> > >> >>> )
>> > >> >>>
>> > >> >>> Which uses the transaction log (internal topic named
>> > >> >>> "__transaction_state")
>> > >> >>> that has a default replication of 3 (that will overwrite your
>> global
>> > >> >>> config
>> > >> >>> value of 2). Then at around 12:30, the leader of the transation
>> log
>> > >> >>> partition kicked both replicas of 190 and 192 out of the replica:
>> > >> >>>
>> > >> >>> [2017-09-05 12:30:31,256] INFO [GroupCoordinator 193]: Preparing
>> to
>> > >> >>> rebalance group KafkaCache_TEST15 with old generation 14
>> > >> >>> (__consumer_offsets-27) (kafka.coordinator.group.Group
>> Coordinator)
>> > >> >>>
>> > >> >>> *[2017-09-05 12:30:41,510] INFO Partition
>> [__transaction_state,9] on
>> > >> >>> broker
>> > >> >>> 193: Shrinking ISR from 193,192,190 to 193
>> > (kafka.cluster.Partition)*
>> > >> >>>
>> > >> >>> *[2017-09-05 12:30:41,513] INFO Partition
>> [__transaction_state,6] on
>> > >> >>> broker
>> > >> >>> 193: Shrinking ISR from 193,190,192 to 193
>> > (kafka.cluster.Partition)*
>> > >> >>>
>> > >> >>> *[2017-09-05 12:30:41,514] INFO Partition
>> [__transaction_state,3] on
>> > >> >>> broker
>> > >> >>> 193: Shrinking ISR from 193,192,190 to 193
>> > (kafka.cluster.Partition)*
>> > >> >>>
>> > >> >>> *[2017-09-05 12:30:41,515] INFO Partition
>> [__transaction_state,18]
>> > on
>> > >> >>> broker 193: Shrinking ISR from 193,190,192 to 193
>> > >> >>> (kafka.cluster.Partition)*
>> > >> >>>
>> > >> >>> *[2017-09-05 12:30:41,515] INFO Partition
>> [__transaction_state,15]
>> > on
>> > >> >>> broker 193: Shrinking ISR from 193,192,190 to 193
>> > >> >>> (kafka.cluster.Partition)*
>> > >> >>>
>> > >> >>> *[2017-09-05 12:30:41,516] INFO Partition
>> [__transaction_state,12]
>> > on
>> > >> >>> broker 193: Shrinking ISR from 193,190,192 to 193
>> > >> >>> (kafka.cluster.Partition)*
>> > >> >>>
>> > >> >>> *[2017-09-05 12:30:41,516] INFO Partition
>> [__consumer_offsets,12] on
>> > >> >>> broker
>> > >> >>> 193: Shrinking ISR from 193,192,190 to 193
>> > (kafka.cluster.Partition)*
>> > >> >>>
>> > >> >>> *[2017-09-05 12:30:41,517] INFO Partition
>> [__consumer_offsets,15] on
>> > >> >>> broker
>> > >> >>> 193: Shrinking ISR from 193,192,190 to 193
>> > (kafka.cluster.Partition)*
>> > >> >>>
>> > >> >>> *[2017-09-05 12:30:41,517] INFO Partition
>> [__transaction_state,24]
>> > on
>> > >> >>> broker 193: Shrinking ISR from 193,190,192 to 193
>> > >> >>> (kafka.cluster.Partition)*
>> > >> >>>
>> > >> >>> At the mean time, both replicas of 190 and 192 seems to be timed
>> out
>> > >> on
>> > >> >>> their fetch requests (note the big timestamp gap in the logs):
>> > >> >>>
>> > >> >>> [2017-09-05 12:26:21,130] INFO Rolled new log segment for
>> > 'AdServe-4'
>> > >> in
>> > >> >>> 1
>> > >> >>> ms. (kafka.log.Log)
>> > >> >>> [2017-09-05 12:30:59,046] WARN [ReplicaFetcherThread-2-193]:
>> Error
>> > in
>> > >> >>> fetch
>> > >> >>> to broker 193, request (type=FetchRequest, replicaId=190,
>> > maxWait=500,
>> > >> >>> minBytes=1, maxBytes=10485760, fetchData={__consumer_offsets-
>> > >> >>> 21=(offset=0,
>> > >> >>> logStartOffset=0, maxBytes=1048576)
>> > >> >>>
>> > >> >>> ...
>> > >> >>>
>> > >> >>> [2017-09-05 12:28:37,514] INFO Deleting index
>> > >> >>> /data1/kafka/AdServe-5/00000000000405000294.timeindex.deleted
>> > >> >>> (kafka.log.TimeIndex)
>> > >> >>> [2017-09-05 12:30:59,042] WARN [ReplicaFetcherThread-2-193]:
>> Error
>> > in
>> > >> >>> fetch
>> > >> >>> to broker 193, request (type=FetchRequest, replicaId=192,
>> > maxWait=500,
>> > >> >>> minBytes=1, maxBytes=10485760, fetchData={__consumer_offsets-
>> > >> >>> 21=(offset=0,
>> > >> >>> logStartOffset=0, maxBytes=1048576)
>> > >> >>>
>> > >> >>>
>> > >> >>>
>> > >> >>> This caused the NotEnoughReplicasException since any appends to
>> the
>> > >> >>> transaction logs are required "acks=all, and
>> min.isr=num.replicas".
>> > >> >>>
>> > >> >>> *[2017-09-05 12:32:11,612] ERROR [Replica Manager on Broker 193]:
>> > >> Error
>> > >> >>> processing append operation on partition __transaction_state-18
>> > >> >>> (kafka.server.ReplicaManager)*
>> > >> >>>
>> > >> >>> *org.apache.kafka.common.errors.NotEnoughReplicasException:
>> Number
>> > of
>> > >> >>> insync replicas for partition __transaction_state-18 is [1],
>> below
>> > >> >>> required
>> > >> >>> minimum [3]*
>> > >> >>>
>> > >> >>> Upon seeing this error, the transaction coordinator should retry
>> > >> >>> appending,
>> > >> >>> but if the retry never succeeds it will be blocked. I did not see
>> > the
>> > >> >>> Streams API client-side logs and so cannot tell for sure, why
>> this
>> > >> caused
>> > >> >>> the Streams app to fail as well. A quick question: did you enable
>> > >> >>> `processing.mode=exactly-once` on your streams app?
>> > >> >>>
>> > >> >>>
>> > >> >>> Guozhang
>> > >> >>>
>> > >> >>>
>> > >> >>>
>> > >> >>>
>> > >> >>> On Fri, Sep 8, 2017 at 1:34 AM, Sameer Kumar <
>> > sam.kum.work@gmail.com>
>> > >> >>> wrote:
>> > >> >>>
>> > >> >>> > Hi All,
>> > >> >>> >
>> > >> >>> >
>> > >> >>> > Any thoughts on the below mail.
>> > >> >>> >
>> > >> >>> > -Sameer.
>> > >> >>> >
>> > >> >>> > On Wed, Sep 6, 2017 at 12:28 PM, Sameer Kumar <
>> > >> sam.kum.work@gmail.com>
>> > >> >>> > wrote:
>> > >> >>> >
>> > >> >>> > > Hi All,
>> > >> >>> > >
>> > >> >>> > > I want to report a scenario wherein my running 2 different
>> > >> instances
>> > >> >>> of
>> > >> >>> > my
>> > >> >>> > > stream application caused my brokers to crash and eventually
>> my
>> > >> >>> stream
>> > >> >>> > > application as well. This scenario only happens when my
>> brokers
>> > >> run
>> > >> >>> on
>> > >> >>> > > Kafka11, everything works fine if my brokers are on Kafka
>> 10..2
>> > >> and
>> > >> >>> > stream
>> > >> >>> > > application on Kafka11.
>> > >> >>> > >
>> > >> >>> > > I am attaching herewith the logs in a zipped format.
>> > >> >>> > >
>> > >> >>> > > The cluster configuration
>> > >> >>> > > 3 nodes(190,192,193) , Kafka 11
>> > >> >>> > > Topic Replication Factor - 2
>> > >> >>> > >
>> > >> >>> > > App configuration
>> > >> >>> > > Kafka 11 streams.
>> > >> >>> > >
>> > >> >>> > >
>> > >> >>> > > The error I saw on 193 server was org.apache.kafka.common.
>> > errors.
>> > >> >>> > NotEnoughReplicasException:
>> > >> >>> > > Number of insync replicas for partition
>> __transaction_state-18
>> > is
>> > >> >>> [1],
>> > >> >>> > > below required minimum [3]. Both 192,190 servers reported
>> errors
>> > >> on
>> > >> >>> > failure
>> > >> >>> > > to read information from 193.
>> > >> >>> > >
>> > >> >>> > > Please look for the time around 12:30-12:32 to find the
>> relevant
>> > >> >>> logs.
>> > >> >>> > Let
>> > >> >>> > > me know if you need some other information.
>> > >> >>> > >
>> > >> >>> > >
>> > >> >>> > > Regards,
>> > >> >>> > > -Sameer.
>> > >> >>> > >
>> > >> >>> >
>> > >> >>>
>> > >> >>>
>> > >> >>>
>> > >> >>> --
>> > >> >>> -- Guozhang
>> > >> >>>
>> > >> >>
>> > >> >>
>> > >> >
>> > >>
>> > >>
>> > >> --
>> > >> -- Guozhang
>> > >>
>> > >
>> > >
>> >
>>
>>
>>
>> --
>> -- Guozhang
>>
>
>

Re: Kafka 11 | Stream Application crashed the brokers

Posted by Sameer Kumar <sa...@gmail.com>.
;Ok. I will inspect this further and keep everyone posted on this.

-Sameer.

On Thu, Sep 14, 2017 at 1:46 AM, Guozhang Wang <wa...@gmail.com> wrote:

> When exactly_once is turned on the transactional id would be set
> automatically by the Streams client.
>
> What I'd inspect is the healthiness of the brokers since the "
> *TimeoutException*", if you have metrics on the broker servers regarding
> request handler thread idleness / request queue length / request rate etc,
> you can monitor that and see what could be the possible causes of the
> broker unavailability.
>
>
> Guozhang
>
>
> On Wed, Sep 13, 2017 at 8:26 AM, Sameer Kumar <sa...@gmail.com>
> wrote:
>
> > Adding more info:-
> >
> > Hi Guozhang,
> >
> > I was using exactly_once processing here, I can see this in the client
> > logs, however I am not setting transaction id though.
> >
> > application.id = c-7-e6
> > application.server =
> > bootstrap.servers = [172.29.65.190:9092, 172.29.65.192:9092,
> > 172.29.65.193:9092]
> > buffered.records.per.partition = 10000
> > cache.max.bytes.buffering = 2097152000
> > client.id =
> > commit.interval.ms = 5000
> > connections.max.idle.ms = 540000
> > default.key.serde = class
> > org.apache.kafka.common.serialization.Serdes$ByteArraySerde
> > default.timestamp.extractor = class
> > org.apache.kafka.streams.processor.FailOnInvalidTimestamp
> > default.value.serde = class
> > org.apache.kafka.common.serialization.Serdes$ByteArraySerde
> > key.serde = class org.apache.kafka.common.serialization.Serdes$
> StringSerde
> > metadata.max.age.ms = 60000
> > metric.reporters = []
> > metrics.num.samples = 2
> > metrics.recording.level = INFO
> > metrics.sample.window.ms = 30000
> > num.standby.replicas = 0
> > num.stream.threads = 15
> > partition.grouper = class
> > org.apache.kafka.streams.processor.DefaultPartitionGrouper
> > poll.ms = 100
> > processing.guarantee = exactly_once
> > receive.buffer.bytes = 32768
> > reconnect.backoff.max.ms = 1000
> > reconnect.backoff.ms = 50
> > replication.factor = 1
> > request.timeout.ms = 40000
> > retry.backoff.ms = 100
> > rocksdb.config.setter = null
> > security.protocol = PLAINTEXT
> > send.buffer.bytes = 131072
> > state.cleanup.delay.ms = 4611686018427386903
> > state.dir = /data/streampoc/
> > timestamp.extractor = class
> > org.apache.kafka.streams.processor.WallclockTimestampExtractor
> > value.serde = class org.apache.kafka.common.serialization.Serdes$
> > StringSerde
> > windowstore.changelog.additional.retention.ms = 86400000
> > zookeeper.connect =
> >
> >
> > On Wed, Sep 13, 2017 at 12:16 PM, Sameer Kumar <sa...@gmail.com>
> > wrote:
> >
> > > Hi Guozhang,
> > >
> > > The producer sending data to this topic is not running concurrently
> with
> > > the stream processing. I had first ingested the data from another
> cluster
> > > and then have the stream processing ran on it. The producer code is
> > written
> > > by me and it doesnt have transactions on by default.
> > >
> > > I will double check if someone else has transaction turned on, but this
> > is
> > > quite unlikely. Is there someway to verify it through logs.
> > >
> > > All of this behavior works fine when brokers are run on Kafka 10, this
> > > might be because transactions are only available on Kafka11. I am
> > > suspecting would there be a case that too much processing is causing
> one
> > of
> > > the brokers to crash. The timeouts are indicating that it is taking
> time
> > to
> > > send data
> > >
> > > I have tried this behavior also on a another cluster which I
> exclusively
> > > use it for myself and found the same behavior there as well.
> > >
> > > What do you think should be our next step so that we can get to the
> root
> > > of the issue.
> > >
> > > -Sameer.
> > >
> > > On Wed, Sep 13, 2017 at 6:14 AM, Guozhang Wang <wa...@gmail.com>
> > wrote:
> > >
> > >> Hi Sameer,
> > >>
> > >> If no clients has transactions turned on the `__transaction_state`
> > >> internal
> > >> topic would not be created at all. So I still suspect that some of
> your
> > >> clients (maybe not your Streams client, but your Producer client that
> is
> > >> sending data to the source topic?) has transactions turned on.
> > >>
> > >> BTW from your logs I saw lots of the following errors on client side:
> > >>
> > >> 2017-09-11 12:42:34 ERROR RecordCollectorImpl:113 - task [0_6] Error
> > >> sending record to topic c-7-e6-KSTREAM-BRANCHCHILD-000
> > >> 0000007-repartition.
> > >> No more offsets will be recorded for this task and the exception will
> > >> eventually be thrown
> > >>
> > >> org.apache.kafka.common.errors.*TimeoutException*: Expiring 13
> > record(s)
> > >> for c-7-e6-KSTREAM-BRANCHCHILD-0000000007-repartition-3: 31174 ms has
> > >> passed since last append
> > >>
> > >> 2017-09-11 12:42:36 WARN  Sender:511 - Got error produce response with
> > >> correlation id 82862 on topic-partition
> > >> c-7-e6-KSTREAM-JOINTHIS-0000000018-store-changelog-22, retrying
> > >> (2147483646
> > >> attempts left). *Error: NETWORK_EXCEPTION*
> > >>
> > >> 2017-09-11 12:42:36 ERROR RecordCollectorImpl:113 - task [0_22] Error
> > >> sending record to topic c-7-e6-KSTREAM-BRANCHCHILD-000
> > >> 0000007-repartition.
> > >> No more offsets will be recorded for this task and the exception will
> > >> eventually be thrown
> > >>
> > >> org.apache.kafka.common.errors.*TimeoutException*: Expiring 13
> > record(s)
> > >> for c-7-e6-KSTREAM-BRANCHCHILD-0000000007-repartition-3: 31467 ms has
> > >> passed since last append
> > >>
> > >>
> > >> Today if the TimeoutException is thrown from the recordCollector it
> will
> > >> cause the Streams to throw this exception all the way to the user
> > >> exception
> > >> handler and then shutdown the thread. And this exception would be
> thrown
> > >> if
> > >> the Kafka broker itself is not available (also from your previous logs
> > it
> > >> seems broker 192 and 193 was unavailable and hence being kicked out by
> > >> broker 109 out of the IRS).
> > >>
> > >>
> > >> Guozhang
> > >>
> > >>
> > >>
> > >> On Mon, Sep 11, 2017 at 3:40 AM, Sameer Kumar <sam.kum.work@gmail.com
> >
> > >> wrote:
> > >>
> > >> > Hi Guozhang,
> > >> >
> > >> > Please find the relevant logs, see a folder for client logs as well,
> > >> > things started getting awry at 12:42:05.
> > >> > Let me know if you need any more information.
> > >> >
> > >> > -Sameer.
> > >> >
> > >> > On Sun, Sep 10, 2017 at 5:06 PM, Sameer Kumar <
> sam.kum.work@gmail.com
> > >
> > >> > wrote:
> > >> >
> > >> >> Hi Guozhang,
> > >> >>
> > >> >> Nope, I was not using exactly-once mode. I dont have the client
> logs
> > >> with
> > >> >> me right now, I will try to replicate it again and share the other
> > >> details
> > >> >> with you.
> > >> >>
> > >> >> My concern was that it crashed my brokers as well.
> > >> >>
> > >> >> -Sameer.
> > >> >>
> > >> >> On Sat, Sep 9, 2017 at 1:51 AM, Guozhang Wang <wa...@gmail.com>
> > >> wrote:
> > >> >>
> > >> >>> Hello Sameer,
> > >> >>>
> > >> >>> I looked through your code, and here is what I figured: in 0.11
> > >> version
> > >> >>> we
> > >> >>> added the exactly-once feature (
> > >> >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+E
> > >> >>> xactly+Once+Delivery+and+Transactional+Messaging
> > >> >>> )
> > >> >>>
> > >> >>> Which uses the transaction log (internal topic named
> > >> >>> "__transaction_state")
> > >> >>> that has a default replication of 3 (that will overwrite your
> global
> > >> >>> config
> > >> >>> value of 2). Then at around 12:30, the leader of the transation
> log
> > >> >>> partition kicked both replicas of 190 and 192 out of the replica:
> > >> >>>
> > >> >>> [2017-09-05 12:30:31,256] INFO [GroupCoordinator 193]: Preparing
> to
> > >> >>> rebalance group KafkaCache_TEST15 with old generation 14
> > >> >>> (__consumer_offsets-27) (kafka.coordinator.group.
> GroupCoordinator)
> > >> >>>
> > >> >>> *[2017-09-05 12:30:41,510] INFO Partition [__transaction_state,9]
> on
> > >> >>> broker
> > >> >>> 193: Shrinking ISR from 193,192,190 to 193
> > (kafka.cluster.Partition)*
> > >> >>>
> > >> >>> *[2017-09-05 12:30:41,513] INFO Partition [__transaction_state,6]
> on
> > >> >>> broker
> > >> >>> 193: Shrinking ISR from 193,190,192 to 193
> > (kafka.cluster.Partition)*
> > >> >>>
> > >> >>> *[2017-09-05 12:30:41,514] INFO Partition [__transaction_state,3]
> on
> > >> >>> broker
> > >> >>> 193: Shrinking ISR from 193,192,190 to 193
> > (kafka.cluster.Partition)*
> > >> >>>
> > >> >>> *[2017-09-05 12:30:41,515] INFO Partition [__transaction_state,18]
> > on
> > >> >>> broker 193: Shrinking ISR from 193,190,192 to 193
> > >> >>> (kafka.cluster.Partition)*
> > >> >>>
> > >> >>> *[2017-09-05 12:30:41,515] INFO Partition [__transaction_state,15]
> > on
> > >> >>> broker 193: Shrinking ISR from 193,192,190 to 193
> > >> >>> (kafka.cluster.Partition)*
> > >> >>>
> > >> >>> *[2017-09-05 12:30:41,516] INFO Partition [__transaction_state,12]
> > on
> > >> >>> broker 193: Shrinking ISR from 193,190,192 to 193
> > >> >>> (kafka.cluster.Partition)*
> > >> >>>
> > >> >>> *[2017-09-05 12:30:41,516] INFO Partition [__consumer_offsets,12]
> on
> > >> >>> broker
> > >> >>> 193: Shrinking ISR from 193,192,190 to 193
> > (kafka.cluster.Partition)*
> > >> >>>
> > >> >>> *[2017-09-05 12:30:41,517] INFO Partition [__consumer_offsets,15]
> on
> > >> >>> broker
> > >> >>> 193: Shrinking ISR from 193,192,190 to 193
> > (kafka.cluster.Partition)*
> > >> >>>
> > >> >>> *[2017-09-05 12:30:41,517] INFO Partition [__transaction_state,24]
> > on
> > >> >>> broker 193: Shrinking ISR from 193,190,192 to 193
> > >> >>> (kafka.cluster.Partition)*
> > >> >>>
> > >> >>> At the mean time, both replicas of 190 and 192 seems to be timed
> out
> > >> on
> > >> >>> their fetch requests (note the big timestamp gap in the logs):
> > >> >>>
> > >> >>> [2017-09-05 12:26:21,130] INFO Rolled new log segment for
> > 'AdServe-4'
> > >> in
> > >> >>> 1
> > >> >>> ms. (kafka.log.Log)
> > >> >>> [2017-09-05 12:30:59,046] WARN [ReplicaFetcherThread-2-193]: Error
> > in
> > >> >>> fetch
> > >> >>> to broker 193, request (type=FetchRequest, replicaId=190,
> > maxWait=500,
> > >> >>> minBytes=1, maxBytes=10485760, fetchData={__consumer_offsets-
> > >> >>> 21=(offset=0,
> > >> >>> logStartOffset=0, maxBytes=1048576)
> > >> >>>
> > >> >>> ...
> > >> >>>
> > >> >>> [2017-09-05 12:28:37,514] INFO Deleting index
> > >> >>> /data1/kafka/AdServe-5/00000000000405000294.timeindex.deleted
> > >> >>> (kafka.log.TimeIndex)
> > >> >>> [2017-09-05 12:30:59,042] WARN [ReplicaFetcherThread-2-193]: Error
> > in
> > >> >>> fetch
> > >> >>> to broker 193, request (type=FetchRequest, replicaId=192,
> > maxWait=500,
> > >> >>> minBytes=1, maxBytes=10485760, fetchData={__consumer_offsets-
> > >> >>> 21=(offset=0,
> > >> >>> logStartOffset=0, maxBytes=1048576)
> > >> >>>
> > >> >>>
> > >> >>>
> > >> >>> This caused the NotEnoughReplicasException since any appends to
> the
> > >> >>> transaction logs are required "acks=all, and
> min.isr=num.replicas".
> > >> >>>
> > >> >>> *[2017-09-05 12:32:11,612] ERROR [Replica Manager on Broker 193]:
> > >> Error
> > >> >>> processing append operation on partition __transaction_state-18
> > >> >>> (kafka.server.ReplicaManager)*
> > >> >>>
> > >> >>> *org.apache.kafka.common.errors.NotEnoughReplicasException:
> Number
> > of
> > >> >>> insync replicas for partition __transaction_state-18 is [1], below
> > >> >>> required
> > >> >>> minimum [3]*
> > >> >>>
> > >> >>> Upon seeing this error, the transaction coordinator should retry
> > >> >>> appending,
> > >> >>> but if the retry never succeeds it will be blocked. I did not see
> > the
> > >> >>> Streams API client-side logs and so cannot tell for sure, why this
> > >> caused
> > >> >>> the Streams app to fail as well. A quick question: did you enable
> > >> >>> `processing.mode=exactly-once` on your streams app?
> > >> >>>
> > >> >>>
> > >> >>> Guozhang
> > >> >>>
> > >> >>>
> > >> >>>
> > >> >>>
> > >> >>> On Fri, Sep 8, 2017 at 1:34 AM, Sameer Kumar <
> > sam.kum.work@gmail.com>
> > >> >>> wrote:
> > >> >>>
> > >> >>> > Hi All,
> > >> >>> >
> > >> >>> >
> > >> >>> > Any thoughts on the below mail.
> > >> >>> >
> > >> >>> > -Sameer.
> > >> >>> >
> > >> >>> > On Wed, Sep 6, 2017 at 12:28 PM, Sameer Kumar <
> > >> sam.kum.work@gmail.com>
> > >> >>> > wrote:
> > >> >>> >
> > >> >>> > > Hi All,
> > >> >>> > >
> > >> >>> > > I want to report a scenario wherein my running 2 different
> > >> instances
> > >> >>> of
> > >> >>> > my
> > >> >>> > > stream application caused my brokers to crash and eventually
> my
> > >> >>> stream
> > >> >>> > > application as well. This scenario only happens when my
> brokers
> > >> run
> > >> >>> on
> > >> >>> > > Kafka11, everything works fine if my brokers are on Kafka
> 10..2
> > >> and
> > >> >>> > stream
> > >> >>> > > application on Kafka11.
> > >> >>> > >
> > >> >>> > > I am attaching herewith the logs in a zipped format.
> > >> >>> > >
> > >> >>> > > The cluster configuration
> > >> >>> > > 3 nodes(190,192,193) , Kafka 11
> > >> >>> > > Topic Replication Factor - 2
> > >> >>> > >
> > >> >>> > > App configuration
> > >> >>> > > Kafka 11 streams.
> > >> >>> > >
> > >> >>> > >
> > >> >>> > > The error I saw on 193 server was org.apache.kafka.common.
> > errors.
> > >> >>> > NotEnoughReplicasException:
> > >> >>> > > Number of insync replicas for partition __transaction_state-18
> > is
> > >> >>> [1],
> > >> >>> > > below required minimum [3]. Both 192,190 servers reported
> errors
> > >> on
> > >> >>> > failure
> > >> >>> > > to read information from 193.
> > >> >>> > >
> > >> >>> > > Please look for the time around 12:30-12:32 to find the
> relevant
> > >> >>> logs.
> > >> >>> > Let
> > >> >>> > > me know if you need some other information.
> > >> >>> > >
> > >> >>> > >
> > >> >>> > > Regards,
> > >> >>> > > -Sameer.
> > >> >>> > >
> > >> >>> >
> > >> >>>
> > >> >>>
> > >> >>>
> > >> >>> --
> > >> >>> -- Guozhang
> > >> >>>
> > >> >>
> > >> >>
> > >> >
> > >>
> > >>
> > >> --
> > >> -- Guozhang
> > >>
> > >
> > >
> >
>
>
>
> --
> -- Guozhang
>

Re: Kafka 11 | Stream Application crashed the brokers

Posted by Guozhang Wang <wa...@gmail.com>.
When exactly_once is turned on the transactional id would be set
automatically by the Streams client.

What I'd inspect is the healthiness of the brokers since the "
*TimeoutException*", if you have metrics on the broker servers regarding
request handler thread idleness / request queue length / request rate etc,
you can monitor that and see what could be the possible causes of the
broker unavailability.


Guozhang


On Wed, Sep 13, 2017 at 8:26 AM, Sameer Kumar <sa...@gmail.com>
wrote:

> Adding more info:-
>
> Hi Guozhang,
>
> I was using exactly_once processing here, I can see this in the client
> logs, however I am not setting transaction id though.
>
> application.id = c-7-e6
> application.server =
> bootstrap.servers = [172.29.65.190:9092, 172.29.65.192:9092,
> 172.29.65.193:9092]
> buffered.records.per.partition = 10000
> cache.max.bytes.buffering = 2097152000
> client.id =
> commit.interval.ms = 5000
> connections.max.idle.ms = 540000
> default.key.serde = class
> org.apache.kafka.common.serialization.Serdes$ByteArraySerde
> default.timestamp.extractor = class
> org.apache.kafka.streams.processor.FailOnInvalidTimestamp
> default.value.serde = class
> org.apache.kafka.common.serialization.Serdes$ByteArraySerde
> key.serde = class org.apache.kafka.common.serialization.Serdes$StringSerde
> metadata.max.age.ms = 60000
> metric.reporters = []
> metrics.num.samples = 2
> metrics.recording.level = INFO
> metrics.sample.window.ms = 30000
> num.standby.replicas = 0
> num.stream.threads = 15
> partition.grouper = class
> org.apache.kafka.streams.processor.DefaultPartitionGrouper
> poll.ms = 100
> processing.guarantee = exactly_once
> receive.buffer.bytes = 32768
> reconnect.backoff.max.ms = 1000
> reconnect.backoff.ms = 50
> replication.factor = 1
> request.timeout.ms = 40000
> retry.backoff.ms = 100
> rocksdb.config.setter = null
> security.protocol = PLAINTEXT
> send.buffer.bytes = 131072
> state.cleanup.delay.ms = 4611686018427386903
> state.dir = /data/streampoc/
> timestamp.extractor = class
> org.apache.kafka.streams.processor.WallclockTimestampExtractor
> value.serde = class org.apache.kafka.common.serialization.Serdes$
> StringSerde
> windowstore.changelog.additional.retention.ms = 86400000
> zookeeper.connect =
>
>
> On Wed, Sep 13, 2017 at 12:16 PM, Sameer Kumar <sa...@gmail.com>
> wrote:
>
> > Hi Guozhang,
> >
> > The producer sending data to this topic is not running concurrently with
> > the stream processing. I had first ingested the data from another cluster
> > and then have the stream processing ran on it. The producer code is
> written
> > by me and it doesnt have transactions on by default.
> >
> > I will double check if someone else has transaction turned on, but this
> is
> > quite unlikely. Is there someway to verify it through logs.
> >
> > All of this behavior works fine when brokers are run on Kafka 10, this
> > might be because transactions are only available on Kafka11. I am
> > suspecting would there be a case that too much processing is causing one
> of
> > the brokers to crash. The timeouts are indicating that it is taking time
> to
> > send data
> >
> > I have tried this behavior also on a another cluster which I exclusively
> > use it for myself and found the same behavior there as well.
> >
> > What do you think should be our next step so that we can get to the root
> > of the issue.
> >
> > -Sameer.
> >
> > On Wed, Sep 13, 2017 at 6:14 AM, Guozhang Wang <wa...@gmail.com>
> wrote:
> >
> >> Hi Sameer,
> >>
> >> If no clients has transactions turned on the `__transaction_state`
> >> internal
> >> topic would not be created at all. So I still suspect that some of your
> >> clients (maybe not your Streams client, but your Producer client that is
> >> sending data to the source topic?) has transactions turned on.
> >>
> >> BTW from your logs I saw lots of the following errors on client side:
> >>
> >> 2017-09-11 12:42:34 ERROR RecordCollectorImpl:113 - task [0_6] Error
> >> sending record to topic c-7-e6-KSTREAM-BRANCHCHILD-000
> >> 0000007-repartition.
> >> No more offsets will be recorded for this task and the exception will
> >> eventually be thrown
> >>
> >> org.apache.kafka.common.errors.*TimeoutException*: Expiring 13
> record(s)
> >> for c-7-e6-KSTREAM-BRANCHCHILD-0000000007-repartition-3: 31174 ms has
> >> passed since last append
> >>
> >> 2017-09-11 12:42:36 WARN  Sender:511 - Got error produce response with
> >> correlation id 82862 on topic-partition
> >> c-7-e6-KSTREAM-JOINTHIS-0000000018-store-changelog-22, retrying
> >> (2147483646
> >> attempts left). *Error: NETWORK_EXCEPTION*
> >>
> >> 2017-09-11 12:42:36 ERROR RecordCollectorImpl:113 - task [0_22] Error
> >> sending record to topic c-7-e6-KSTREAM-BRANCHCHILD-000
> >> 0000007-repartition.
> >> No more offsets will be recorded for this task and the exception will
> >> eventually be thrown
> >>
> >> org.apache.kafka.common.errors.*TimeoutException*: Expiring 13
> record(s)
> >> for c-7-e6-KSTREAM-BRANCHCHILD-0000000007-repartition-3: 31467 ms has
> >> passed since last append
> >>
> >>
> >> Today if the TimeoutException is thrown from the recordCollector it will
> >> cause the Streams to throw this exception all the way to the user
> >> exception
> >> handler and then shutdown the thread. And this exception would be thrown
> >> if
> >> the Kafka broker itself is not available (also from your previous logs
> it
> >> seems broker 192 and 193 was unavailable and hence being kicked out by
> >> broker 109 out of the IRS).
> >>
> >>
> >> Guozhang
> >>
> >>
> >>
> >> On Mon, Sep 11, 2017 at 3:40 AM, Sameer Kumar <sa...@gmail.com>
> >> wrote:
> >>
> >> > Hi Guozhang,
> >> >
> >> > Please find the relevant logs, see a folder for client logs as well,
> >> > things started getting awry at 12:42:05.
> >> > Let me know if you need any more information.
> >> >
> >> > -Sameer.
> >> >
> >> > On Sun, Sep 10, 2017 at 5:06 PM, Sameer Kumar <sam.kum.work@gmail.com
> >
> >> > wrote:
> >> >
> >> >> Hi Guozhang,
> >> >>
> >> >> Nope, I was not using exactly-once mode. I dont have the client logs
> >> with
> >> >> me right now, I will try to replicate it again and share the other
> >> details
> >> >> with you.
> >> >>
> >> >> My concern was that it crashed my brokers as well.
> >> >>
> >> >> -Sameer.
> >> >>
> >> >> On Sat, Sep 9, 2017 at 1:51 AM, Guozhang Wang <wa...@gmail.com>
> >> wrote:
> >> >>
> >> >>> Hello Sameer,
> >> >>>
> >> >>> I looked through your code, and here is what I figured: in 0.11
> >> version
> >> >>> we
> >> >>> added the exactly-once feature (
> >> >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+E
> >> >>> xactly+Once+Delivery+and+Transactional+Messaging
> >> >>> )
> >> >>>
> >> >>> Which uses the transaction log (internal topic named
> >> >>> "__transaction_state")
> >> >>> that has a default replication of 3 (that will overwrite your global
> >> >>> config
> >> >>> value of 2). Then at around 12:30, the leader of the transation log
> >> >>> partition kicked both replicas of 190 and 192 out of the replica:
> >> >>>
> >> >>> [2017-09-05 12:30:31,256] INFO [GroupCoordinator 193]: Preparing to
> >> >>> rebalance group KafkaCache_TEST15 with old generation 14
> >> >>> (__consumer_offsets-27) (kafka.coordinator.group.GroupCoordinator)
> >> >>>
> >> >>> *[2017-09-05 12:30:41,510] INFO Partition [__transaction_state,9] on
> >> >>> broker
> >> >>> 193: Shrinking ISR from 193,192,190 to 193
> (kafka.cluster.Partition)*
> >> >>>
> >> >>> *[2017-09-05 12:30:41,513] INFO Partition [__transaction_state,6] on
> >> >>> broker
> >> >>> 193: Shrinking ISR from 193,190,192 to 193
> (kafka.cluster.Partition)*
> >> >>>
> >> >>> *[2017-09-05 12:30:41,514] INFO Partition [__transaction_state,3] on
> >> >>> broker
> >> >>> 193: Shrinking ISR from 193,192,190 to 193
> (kafka.cluster.Partition)*
> >> >>>
> >> >>> *[2017-09-05 12:30:41,515] INFO Partition [__transaction_state,18]
> on
> >> >>> broker 193: Shrinking ISR from 193,190,192 to 193
> >> >>> (kafka.cluster.Partition)*
> >> >>>
> >> >>> *[2017-09-05 12:30:41,515] INFO Partition [__transaction_state,15]
> on
> >> >>> broker 193: Shrinking ISR from 193,192,190 to 193
> >> >>> (kafka.cluster.Partition)*
> >> >>>
> >> >>> *[2017-09-05 12:30:41,516] INFO Partition [__transaction_state,12]
> on
> >> >>> broker 193: Shrinking ISR from 193,190,192 to 193
> >> >>> (kafka.cluster.Partition)*
> >> >>>
> >> >>> *[2017-09-05 12:30:41,516] INFO Partition [__consumer_offsets,12] on
> >> >>> broker
> >> >>> 193: Shrinking ISR from 193,192,190 to 193
> (kafka.cluster.Partition)*
> >> >>>
> >> >>> *[2017-09-05 12:30:41,517] INFO Partition [__consumer_offsets,15] on
> >> >>> broker
> >> >>> 193: Shrinking ISR from 193,192,190 to 193
> (kafka.cluster.Partition)*
> >> >>>
> >> >>> *[2017-09-05 12:30:41,517] INFO Partition [__transaction_state,24]
> on
> >> >>> broker 193: Shrinking ISR from 193,190,192 to 193
> >> >>> (kafka.cluster.Partition)*
> >> >>>
> >> >>> At the mean time, both replicas of 190 and 192 seems to be timed out
> >> on
> >> >>> their fetch requests (note the big timestamp gap in the logs):
> >> >>>
> >> >>> [2017-09-05 12:26:21,130] INFO Rolled new log segment for
> 'AdServe-4'
> >> in
> >> >>> 1
> >> >>> ms. (kafka.log.Log)
> >> >>> [2017-09-05 12:30:59,046] WARN [ReplicaFetcherThread-2-193]: Error
> in
> >> >>> fetch
> >> >>> to broker 193, request (type=FetchRequest, replicaId=190,
> maxWait=500,
> >> >>> minBytes=1, maxBytes=10485760, fetchData={__consumer_offsets-
> >> >>> 21=(offset=0,
> >> >>> logStartOffset=0, maxBytes=1048576)
> >> >>>
> >> >>> ...
> >> >>>
> >> >>> [2017-09-05 12:28:37,514] INFO Deleting index
> >> >>> /data1/kafka/AdServe-5/00000000000405000294.timeindex.deleted
> >> >>> (kafka.log.TimeIndex)
> >> >>> [2017-09-05 12:30:59,042] WARN [ReplicaFetcherThread-2-193]: Error
> in
> >> >>> fetch
> >> >>> to broker 193, request (type=FetchRequest, replicaId=192,
> maxWait=500,
> >> >>> minBytes=1, maxBytes=10485760, fetchData={__consumer_offsets-
> >> >>> 21=(offset=0,
> >> >>> logStartOffset=0, maxBytes=1048576)
> >> >>>
> >> >>>
> >> >>>
> >> >>> This caused the NotEnoughReplicasException since any appends to the
> >> >>> transaction logs are required "acks=all, and min.isr=num.replicas".
> >> >>>
> >> >>> *[2017-09-05 12:32:11,612] ERROR [Replica Manager on Broker 193]:
> >> Error
> >> >>> processing append operation on partition __transaction_state-18
> >> >>> (kafka.server.ReplicaManager)*
> >> >>>
> >> >>> *org.apache.kafka.common.errors.NotEnoughReplicasException: Number
> of
> >> >>> insync replicas for partition __transaction_state-18 is [1], below
> >> >>> required
> >> >>> minimum [3]*
> >> >>>
> >> >>> Upon seeing this error, the transaction coordinator should retry
> >> >>> appending,
> >> >>> but if the retry never succeeds it will be blocked. I did not see
> the
> >> >>> Streams API client-side logs and so cannot tell for sure, why this
> >> caused
> >> >>> the Streams app to fail as well. A quick question: did you enable
> >> >>> `processing.mode=exactly-once` on your streams app?
> >> >>>
> >> >>>
> >> >>> Guozhang
> >> >>>
> >> >>>
> >> >>>
> >> >>>
> >> >>> On Fri, Sep 8, 2017 at 1:34 AM, Sameer Kumar <
> sam.kum.work@gmail.com>
> >> >>> wrote:
> >> >>>
> >> >>> > Hi All,
> >> >>> >
> >> >>> >
> >> >>> > Any thoughts on the below mail.
> >> >>> >
> >> >>> > -Sameer.
> >> >>> >
> >> >>> > On Wed, Sep 6, 2017 at 12:28 PM, Sameer Kumar <
> >> sam.kum.work@gmail.com>
> >> >>> > wrote:
> >> >>> >
> >> >>> > > Hi All,
> >> >>> > >
> >> >>> > > I want to report a scenario wherein my running 2 different
> >> instances
> >> >>> of
> >> >>> > my
> >> >>> > > stream application caused my brokers to crash and eventually my
> >> >>> stream
> >> >>> > > application as well. This scenario only happens when my brokers
> >> run
> >> >>> on
> >> >>> > > Kafka11, everything works fine if my brokers are on Kafka 10..2
> >> and
> >> >>> > stream
> >> >>> > > application on Kafka11.
> >> >>> > >
> >> >>> > > I am attaching herewith the logs in a zipped format.
> >> >>> > >
> >> >>> > > The cluster configuration
> >> >>> > > 3 nodes(190,192,193) , Kafka 11
> >> >>> > > Topic Replication Factor - 2
> >> >>> > >
> >> >>> > > App configuration
> >> >>> > > Kafka 11 streams.
> >> >>> > >
> >> >>> > >
> >> >>> > > The error I saw on 193 server was org.apache.kafka.common.
> errors.
> >> >>> > NotEnoughReplicasException:
> >> >>> > > Number of insync replicas for partition __transaction_state-18
> is
> >> >>> [1],
> >> >>> > > below required minimum [3]. Both 192,190 servers reported errors
> >> on
> >> >>> > failure
> >> >>> > > to read information from 193.
> >> >>> > >
> >> >>> > > Please look for the time around 12:30-12:32 to find the relevant
> >> >>> logs.
> >> >>> > Let
> >> >>> > > me know if you need some other information.
> >> >>> > >
> >> >>> > >
> >> >>> > > Regards,
> >> >>> > > -Sameer.
> >> >>> > >
> >> >>> >
> >> >>>
> >> >>>
> >> >>>
> >> >>> --
> >> >>> -- Guozhang
> >> >>>
> >> >>
> >> >>
> >> >
> >>
> >>
> >> --
> >> -- Guozhang
> >>
> >
> >
>



-- 
-- Guozhang

Re: Kafka 11 | Stream Application crashed the brokers

Posted by Sameer Kumar <sa...@gmail.com>.
Adding more info:-

Hi Guozhang,

I was using exactly_once processing here, I can see this in the client
logs, however I am not setting transaction id though.

application.id = c-7-e6
application.server =
bootstrap.servers = [172.29.65.190:9092, 172.29.65.192:9092,
172.29.65.193:9092]
buffered.records.per.partition = 10000
cache.max.bytes.buffering = 2097152000
client.id =
commit.interval.ms = 5000
connections.max.idle.ms = 540000
default.key.serde = class
org.apache.kafka.common.serialization.Serdes$ByteArraySerde
default.timestamp.extractor = class
org.apache.kafka.streams.processor.FailOnInvalidTimestamp
default.value.serde = class
org.apache.kafka.common.serialization.Serdes$ByteArraySerde
key.serde = class org.apache.kafka.common.serialization.Serdes$StringSerde
metadata.max.age.ms = 60000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
num.standby.replicas = 0
num.stream.threads = 15
partition.grouper = class
org.apache.kafka.streams.processor.DefaultPartitionGrouper
poll.ms = 100
processing.guarantee = exactly_once
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
replication.factor = 1
request.timeout.ms = 40000
retry.backoff.ms = 100
rocksdb.config.setter = null
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
state.cleanup.delay.ms = 4611686018427386903
state.dir = /data/streampoc/
timestamp.extractor = class
org.apache.kafka.streams.processor.WallclockTimestampExtractor
value.serde = class org.apache.kafka.common.serialization.Serdes$StringSerde
windowstore.changelog.additional.retention.ms = 86400000
zookeeper.connect =


On Wed, Sep 13, 2017 at 12:16 PM, Sameer Kumar <sa...@gmail.com>
wrote:

> Hi Guozhang,
>
> The producer sending data to this topic is not running concurrently with
> the stream processing. I had first ingested the data from another cluster
> and then have the stream processing ran on it. The producer code is written
> by me and it doesnt have transactions on by default.
>
> I will double check if someone else has transaction turned on, but this is
> quite unlikely. Is there someway to verify it through logs.
>
> All of this behavior works fine when brokers are run on Kafka 10, this
> might be because transactions are only available on Kafka11. I am
> suspecting would there be a case that too much processing is causing one of
> the brokers to crash. The timeouts are indicating that it is taking time to
> send data
>
> I have tried this behavior also on a another cluster which I exclusively
> use it for myself and found the same behavior there as well.
>
> What do you think should be our next step so that we can get to the root
> of the issue.
>
> -Sameer.
>
> On Wed, Sep 13, 2017 at 6:14 AM, Guozhang Wang <wa...@gmail.com> wrote:
>
>> Hi Sameer,
>>
>> If no clients has transactions turned on the `__transaction_state`
>> internal
>> topic would not be created at all. So I still suspect that some of your
>> clients (maybe not your Streams client, but your Producer client that is
>> sending data to the source topic?) has transactions turned on.
>>
>> BTW from your logs I saw lots of the following errors on client side:
>>
>> 2017-09-11 12:42:34 ERROR RecordCollectorImpl:113 - task [0_6] Error
>> sending record to topic c-7-e6-KSTREAM-BRANCHCHILD-000
>> 0000007-repartition.
>> No more offsets will be recorded for this task and the exception will
>> eventually be thrown
>>
>> org.apache.kafka.common.errors.*TimeoutException*: Expiring 13 record(s)
>> for c-7-e6-KSTREAM-BRANCHCHILD-0000000007-repartition-3: 31174 ms has
>> passed since last append
>>
>> 2017-09-11 12:42:36 WARN  Sender:511 - Got error produce response with
>> correlation id 82862 on topic-partition
>> c-7-e6-KSTREAM-JOINTHIS-0000000018-store-changelog-22, retrying
>> (2147483646
>> attempts left). *Error: NETWORK_EXCEPTION*
>>
>> 2017-09-11 12:42:36 ERROR RecordCollectorImpl:113 - task [0_22] Error
>> sending record to topic c-7-e6-KSTREAM-BRANCHCHILD-000
>> 0000007-repartition.
>> No more offsets will be recorded for this task and the exception will
>> eventually be thrown
>>
>> org.apache.kafka.common.errors.*TimeoutException*: Expiring 13 record(s)
>> for c-7-e6-KSTREAM-BRANCHCHILD-0000000007-repartition-3: 31467 ms has
>> passed since last append
>>
>>
>> Today if the TimeoutException is thrown from the recordCollector it will
>> cause the Streams to throw this exception all the way to the user
>> exception
>> handler and then shutdown the thread. And this exception would be thrown
>> if
>> the Kafka broker itself is not available (also from your previous logs it
>> seems broker 192 and 193 was unavailable and hence being kicked out by
>> broker 109 out of the IRS).
>>
>>
>> Guozhang
>>
>>
>>
>> On Mon, Sep 11, 2017 at 3:40 AM, Sameer Kumar <sa...@gmail.com>
>> wrote:
>>
>> > Hi Guozhang,
>> >
>> > Please find the relevant logs, see a folder for client logs as well,
>> > things started getting awry at 12:42:05.
>> > Let me know if you need any more information.
>> >
>> > -Sameer.
>> >
>> > On Sun, Sep 10, 2017 at 5:06 PM, Sameer Kumar <sa...@gmail.com>
>> > wrote:
>> >
>> >> Hi Guozhang,
>> >>
>> >> Nope, I was not using exactly-once mode. I dont have the client logs
>> with
>> >> me right now, I will try to replicate it again and share the other
>> details
>> >> with you.
>> >>
>> >> My concern was that it crashed my brokers as well.
>> >>
>> >> -Sameer.
>> >>
>> >> On Sat, Sep 9, 2017 at 1:51 AM, Guozhang Wang <wa...@gmail.com>
>> wrote:
>> >>
>> >>> Hello Sameer,
>> >>>
>> >>> I looked through your code, and here is what I figured: in 0.11
>> version
>> >>> we
>> >>> added the exactly-once feature (
>> >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+E
>> >>> xactly+Once+Delivery+and+Transactional+Messaging
>> >>> )
>> >>>
>> >>> Which uses the transaction log (internal topic named
>> >>> "__transaction_state")
>> >>> that has a default replication of 3 (that will overwrite your global
>> >>> config
>> >>> value of 2). Then at around 12:30, the leader of the transation log
>> >>> partition kicked both replicas of 190 and 192 out of the replica:
>> >>>
>> >>> [2017-09-05 12:30:31,256] INFO [GroupCoordinator 193]: Preparing to
>> >>> rebalance group KafkaCache_TEST15 with old generation 14
>> >>> (__consumer_offsets-27) (kafka.coordinator.group.GroupCoordinator)
>> >>>
>> >>> *[2017-09-05 12:30:41,510] INFO Partition [__transaction_state,9] on
>> >>> broker
>> >>> 193: Shrinking ISR from 193,192,190 to 193 (kafka.cluster.Partition)*
>> >>>
>> >>> *[2017-09-05 12:30:41,513] INFO Partition [__transaction_state,6] on
>> >>> broker
>> >>> 193: Shrinking ISR from 193,190,192 to 193 (kafka.cluster.Partition)*
>> >>>
>> >>> *[2017-09-05 12:30:41,514] INFO Partition [__transaction_state,3] on
>> >>> broker
>> >>> 193: Shrinking ISR from 193,192,190 to 193 (kafka.cluster.Partition)*
>> >>>
>> >>> *[2017-09-05 12:30:41,515] INFO Partition [__transaction_state,18] on
>> >>> broker 193: Shrinking ISR from 193,190,192 to 193
>> >>> (kafka.cluster.Partition)*
>> >>>
>> >>> *[2017-09-05 12:30:41,515] INFO Partition [__transaction_state,15] on
>> >>> broker 193: Shrinking ISR from 193,192,190 to 193
>> >>> (kafka.cluster.Partition)*
>> >>>
>> >>> *[2017-09-05 12:30:41,516] INFO Partition [__transaction_state,12] on
>> >>> broker 193: Shrinking ISR from 193,190,192 to 193
>> >>> (kafka.cluster.Partition)*
>> >>>
>> >>> *[2017-09-05 12:30:41,516] INFO Partition [__consumer_offsets,12] on
>> >>> broker
>> >>> 193: Shrinking ISR from 193,192,190 to 193 (kafka.cluster.Partition)*
>> >>>
>> >>> *[2017-09-05 12:30:41,517] INFO Partition [__consumer_offsets,15] on
>> >>> broker
>> >>> 193: Shrinking ISR from 193,192,190 to 193 (kafka.cluster.Partition)*
>> >>>
>> >>> *[2017-09-05 12:30:41,517] INFO Partition [__transaction_state,24] on
>> >>> broker 193: Shrinking ISR from 193,190,192 to 193
>> >>> (kafka.cluster.Partition)*
>> >>>
>> >>> At the mean time, both replicas of 190 and 192 seems to be timed out
>> on
>> >>> their fetch requests (note the big timestamp gap in the logs):
>> >>>
>> >>> [2017-09-05 12:26:21,130] INFO Rolled new log segment for 'AdServe-4'
>> in
>> >>> 1
>> >>> ms. (kafka.log.Log)
>> >>> [2017-09-05 12:30:59,046] WARN [ReplicaFetcherThread-2-193]: Error in
>> >>> fetch
>> >>> to broker 193, request (type=FetchRequest, replicaId=190, maxWait=500,
>> >>> minBytes=1, maxBytes=10485760, fetchData={__consumer_offsets-
>> >>> 21=(offset=0,
>> >>> logStartOffset=0, maxBytes=1048576)
>> >>>
>> >>> ...
>> >>>
>> >>> [2017-09-05 12:28:37,514] INFO Deleting index
>> >>> /data1/kafka/AdServe-5/00000000000405000294.timeindex.deleted
>> >>> (kafka.log.TimeIndex)
>> >>> [2017-09-05 12:30:59,042] WARN [ReplicaFetcherThread-2-193]: Error in
>> >>> fetch
>> >>> to broker 193, request (type=FetchRequest, replicaId=192, maxWait=500,
>> >>> minBytes=1, maxBytes=10485760, fetchData={__consumer_offsets-
>> >>> 21=(offset=0,
>> >>> logStartOffset=0, maxBytes=1048576)
>> >>>
>> >>>
>> >>>
>> >>> This caused the NotEnoughReplicasException since any appends to the
>> >>> transaction logs are required "acks=all, and min.isr=num.replicas".
>> >>>
>> >>> *[2017-09-05 12:32:11,612] ERROR [Replica Manager on Broker 193]:
>> Error
>> >>> processing append operation on partition __transaction_state-18
>> >>> (kafka.server.ReplicaManager)*
>> >>>
>> >>> *org.apache.kafka.common.errors.NotEnoughReplicasException: Number of
>> >>> insync replicas for partition __transaction_state-18 is [1], below
>> >>> required
>> >>> minimum [3]*
>> >>>
>> >>> Upon seeing this error, the transaction coordinator should retry
>> >>> appending,
>> >>> but if the retry never succeeds it will be blocked. I did not see the
>> >>> Streams API client-side logs and so cannot tell for sure, why this
>> caused
>> >>> the Streams app to fail as well. A quick question: did you enable
>> >>> `processing.mode=exactly-once` on your streams app?
>> >>>
>> >>>
>> >>> Guozhang
>> >>>
>> >>>
>> >>>
>> >>>
>> >>> On Fri, Sep 8, 2017 at 1:34 AM, Sameer Kumar <sa...@gmail.com>
>> >>> wrote:
>> >>>
>> >>> > Hi All,
>> >>> >
>> >>> >
>> >>> > Any thoughts on the below mail.
>> >>> >
>> >>> > -Sameer.
>> >>> >
>> >>> > On Wed, Sep 6, 2017 at 12:28 PM, Sameer Kumar <
>> sam.kum.work@gmail.com>
>> >>> > wrote:
>> >>> >
>> >>> > > Hi All,
>> >>> > >
>> >>> > > I want to report a scenario wherein my running 2 different
>> instances
>> >>> of
>> >>> > my
>> >>> > > stream application caused my brokers to crash and eventually my
>> >>> stream
>> >>> > > application as well. This scenario only happens when my brokers
>> run
>> >>> on
>> >>> > > Kafka11, everything works fine if my brokers are on Kafka 10..2
>> and
>> >>> > stream
>> >>> > > application on Kafka11.
>> >>> > >
>> >>> > > I am attaching herewith the logs in a zipped format.
>> >>> > >
>> >>> > > The cluster configuration
>> >>> > > 3 nodes(190,192,193) , Kafka 11
>> >>> > > Topic Replication Factor - 2
>> >>> > >
>> >>> > > App configuration
>> >>> > > Kafka 11 streams.
>> >>> > >
>> >>> > >
>> >>> > > The error I saw on 193 server was org.apache.kafka.common.errors.
>> >>> > NotEnoughReplicasException:
>> >>> > > Number of insync replicas for partition __transaction_state-18 is
>> >>> [1],
>> >>> > > below required minimum [3]. Both 192,190 servers reported errors
>> on
>> >>> > failure
>> >>> > > to read information from 193.
>> >>> > >
>> >>> > > Please look for the time around 12:30-12:32 to find the relevant
>> >>> logs.
>> >>> > Let
>> >>> > > me know if you need some other information.
>> >>> > >
>> >>> > >
>> >>> > > Regards,
>> >>> > > -Sameer.
>> >>> > >
>> >>> >
>> >>>
>> >>>
>> >>>
>> >>> --
>> >>> -- Guozhang
>> >>>
>> >>
>> >>
>> >
>>
>>
>> --
>> -- Guozhang
>>
>
>

Re: Kafka 11 | Stream Application crashed the brokers

Posted by Sameer Kumar <sa...@gmail.com>.
Hi Guozhang,

The producer sending data to this topic is not running concurrently with
the stream processing. I had first ingested the data from another cluster
and then have the stream processing ran on it. The producer code is written
by me and it doesnt have transactions on by default.

I will double check if someone else has transaction turned on, but this is
quite unlikely. Is there someway to verify it through logs.

All of this behavior works fine when brokers are run on Kafka 10, this
might be because transactions are only available on Kafka11. I am
suspecting would there be a case that too much processing is causing one of
the brokers to crash. The timeouts are indicating that it is taking time to
send data

I have tried this behavior also on a another cluster which I exclusively
use it for myself and found the same behavior there as well.

What do you think should be our next step so that we can get to the root of
the issue.

-Sameer.

On Wed, Sep 13, 2017 at 6:14 AM, Guozhang Wang <wa...@gmail.com> wrote:

> Hi Sameer,
>
> If no clients has transactions turned on the `__transaction_state` internal
> topic would not be created at all. So I still suspect that some of your
> clients (maybe not your Streams client, but your Producer client that is
> sending data to the source topic?) has transactions turned on.
>
> BTW from your logs I saw lots of the following errors on client side:
>
> 2017-09-11 12:42:34 ERROR RecordCollectorImpl:113 - task [0_6] Error
> sending record to topic c-7-e6-KSTREAM-BRANCHCHILD-0000000007-repartition.
> No more offsets will be recorded for this task and the exception will
> eventually be thrown
>
> org.apache.kafka.common.errors.*TimeoutException*: Expiring 13 record(s)
> for c-7-e6-KSTREAM-BRANCHCHILD-0000000007-repartition-3: 31174 ms has
> passed since last append
>
> 2017-09-11 12:42:36 WARN  Sender:511 - Got error produce response with
> correlation id 82862 on topic-partition
> c-7-e6-KSTREAM-JOINTHIS-0000000018-store-changelog-22, retrying
> (2147483646
> attempts left). *Error: NETWORK_EXCEPTION*
>
> 2017-09-11 12:42:36 ERROR RecordCollectorImpl:113 - task [0_22] Error
> sending record to topic c-7-e6-KSTREAM-BRANCHCHILD-0000000007-repartition.
> No more offsets will be recorded for this task and the exception will
> eventually be thrown
>
> org.apache.kafka.common.errors.*TimeoutException*: Expiring 13 record(s)
> for c-7-e6-KSTREAM-BRANCHCHILD-0000000007-repartition-3: 31467 ms has
> passed since last append
>
>
> Today if the TimeoutException is thrown from the recordCollector it will
> cause the Streams to throw this exception all the way to the user exception
> handler and then shutdown the thread. And this exception would be thrown if
> the Kafka broker itself is not available (also from your previous logs it
> seems broker 192 and 193 was unavailable and hence being kicked out by
> broker 109 out of the IRS).
>
>
> Guozhang
>
>
>
> On Mon, Sep 11, 2017 at 3:40 AM, Sameer Kumar <sa...@gmail.com>
> wrote:
>
> > Hi Guozhang,
> >
> > Please find the relevant logs, see a folder for client logs as well,
> > things started getting awry at 12:42:05.
> > Let me know if you need any more information.
> >
> > -Sameer.
> >
> > On Sun, Sep 10, 2017 at 5:06 PM, Sameer Kumar <sa...@gmail.com>
> > wrote:
> >
> >> Hi Guozhang,
> >>
> >> Nope, I was not using exactly-once mode. I dont have the client logs
> with
> >> me right now, I will try to replicate it again and share the other
> details
> >> with you.
> >>
> >> My concern was that it crashed my brokers as well.
> >>
> >> -Sameer.
> >>
> >> On Sat, Sep 9, 2017 at 1:51 AM, Guozhang Wang <wa...@gmail.com>
> wrote:
> >>
> >>> Hello Sameer,
> >>>
> >>> I looked through your code, and here is what I figured: in 0.11 version
> >>> we
> >>> added the exactly-once feature (
> >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+E
> >>> xactly+Once+Delivery+and+Transactional+Messaging
> >>> )
> >>>
> >>> Which uses the transaction log (internal topic named
> >>> "__transaction_state")
> >>> that has a default replication of 3 (that will overwrite your global
> >>> config
> >>> value of 2). Then at around 12:30, the leader of the transation log
> >>> partition kicked both replicas of 190 and 192 out of the replica:
> >>>
> >>> [2017-09-05 12:30:31,256] INFO [GroupCoordinator 193]: Preparing to
> >>> rebalance group KafkaCache_TEST15 with old generation 14
> >>> (__consumer_offsets-27) (kafka.coordinator.group.GroupCoordinator)
> >>>
> >>> *[2017-09-05 12:30:41,510] INFO Partition [__transaction_state,9] on
> >>> broker
> >>> 193: Shrinking ISR from 193,192,190 to 193 (kafka.cluster.Partition)*
> >>>
> >>> *[2017-09-05 12:30:41,513] INFO Partition [__transaction_state,6] on
> >>> broker
> >>> 193: Shrinking ISR from 193,190,192 to 193 (kafka.cluster.Partition)*
> >>>
> >>> *[2017-09-05 12:30:41,514] INFO Partition [__transaction_state,3] on
> >>> broker
> >>> 193: Shrinking ISR from 193,192,190 to 193 (kafka.cluster.Partition)*
> >>>
> >>> *[2017-09-05 12:30:41,515] INFO Partition [__transaction_state,18] on
> >>> broker 193: Shrinking ISR from 193,190,192 to 193
> >>> (kafka.cluster.Partition)*
> >>>
> >>> *[2017-09-05 12:30:41,515] INFO Partition [__transaction_state,15] on
> >>> broker 193: Shrinking ISR from 193,192,190 to 193
> >>> (kafka.cluster.Partition)*
> >>>
> >>> *[2017-09-05 12:30:41,516] INFO Partition [__transaction_state,12] on
> >>> broker 193: Shrinking ISR from 193,190,192 to 193
> >>> (kafka.cluster.Partition)*
> >>>
> >>> *[2017-09-05 12:30:41,516] INFO Partition [__consumer_offsets,12] on
> >>> broker
> >>> 193: Shrinking ISR from 193,192,190 to 193 (kafka.cluster.Partition)*
> >>>
> >>> *[2017-09-05 12:30:41,517] INFO Partition [__consumer_offsets,15] on
> >>> broker
> >>> 193: Shrinking ISR from 193,192,190 to 193 (kafka.cluster.Partition)*
> >>>
> >>> *[2017-09-05 12:30:41,517] INFO Partition [__transaction_state,24] on
> >>> broker 193: Shrinking ISR from 193,190,192 to 193
> >>> (kafka.cluster.Partition)*
> >>>
> >>> At the mean time, both replicas of 190 and 192 seems to be timed out on
> >>> their fetch requests (note the big timestamp gap in the logs):
> >>>
> >>> [2017-09-05 12:26:21,130] INFO Rolled new log segment for 'AdServe-4'
> in
> >>> 1
> >>> ms. (kafka.log.Log)
> >>> [2017-09-05 12:30:59,046] WARN [ReplicaFetcherThread-2-193]: Error in
> >>> fetch
> >>> to broker 193, request (type=FetchRequest, replicaId=190, maxWait=500,
> >>> minBytes=1, maxBytes=10485760, fetchData={__consumer_offsets-
> >>> 21=(offset=0,
> >>> logStartOffset=0, maxBytes=1048576)
> >>>
> >>> ...
> >>>
> >>> [2017-09-05 12:28:37,514] INFO Deleting index
> >>> /data1/kafka/AdServe-5/00000000000405000294.timeindex.deleted
> >>> (kafka.log.TimeIndex)
> >>> [2017-09-05 12:30:59,042] WARN [ReplicaFetcherThread-2-193]: Error in
> >>> fetch
> >>> to broker 193, request (type=FetchRequest, replicaId=192, maxWait=500,
> >>> minBytes=1, maxBytes=10485760, fetchData={__consumer_offsets-
> >>> 21=(offset=0,
> >>> logStartOffset=0, maxBytes=1048576)
> >>>
> >>>
> >>>
> >>> This caused the NotEnoughReplicasException since any appends to the
> >>> transaction logs are required "acks=all, and min.isr=num.replicas".
> >>>
> >>> *[2017-09-05 12:32:11,612] ERROR [Replica Manager on Broker 193]: Error
> >>> processing append operation on partition __transaction_state-18
> >>> (kafka.server.ReplicaManager)*
> >>>
> >>> *org.apache.kafka.common.errors.NotEnoughReplicasException: Number of
> >>> insync replicas for partition __transaction_state-18 is [1], below
> >>> required
> >>> minimum [3]*
> >>>
> >>> Upon seeing this error, the transaction coordinator should retry
> >>> appending,
> >>> but if the retry never succeeds it will be blocked. I did not see the
> >>> Streams API client-side logs and so cannot tell for sure, why this
> caused
> >>> the Streams app to fail as well. A quick question: did you enable
> >>> `processing.mode=exactly-once` on your streams app?
> >>>
> >>>
> >>> Guozhang
> >>>
> >>>
> >>>
> >>>
> >>> On Fri, Sep 8, 2017 at 1:34 AM, Sameer Kumar <sa...@gmail.com>
> >>> wrote:
> >>>
> >>> > Hi All,
> >>> >
> >>> >
> >>> > Any thoughts on the below mail.
> >>> >
> >>> > -Sameer.
> >>> >
> >>> > On Wed, Sep 6, 2017 at 12:28 PM, Sameer Kumar <
> sam.kum.work@gmail.com>
> >>> > wrote:
> >>> >
> >>> > > Hi All,
> >>> > >
> >>> > > I want to report a scenario wherein my running 2 different
> instances
> >>> of
> >>> > my
> >>> > > stream application caused my brokers to crash and eventually my
> >>> stream
> >>> > > application as well. This scenario only happens when my brokers run
> >>> on
> >>> > > Kafka11, everything works fine if my brokers are on Kafka 10..2 and
> >>> > stream
> >>> > > application on Kafka11.
> >>> > >
> >>> > > I am attaching herewith the logs in a zipped format.
> >>> > >
> >>> > > The cluster configuration
> >>> > > 3 nodes(190,192,193) , Kafka 11
> >>> > > Topic Replication Factor - 2
> >>> > >
> >>> > > App configuration
> >>> > > Kafka 11 streams.
> >>> > >
> >>> > >
> >>> > > The error I saw on 193 server was org.apache.kafka.common.errors.
> >>> > NotEnoughReplicasException:
> >>> > > Number of insync replicas for partition __transaction_state-18 is
> >>> [1],
> >>> > > below required minimum [3]. Both 192,190 servers reported errors on
> >>> > failure
> >>> > > to read information from 193.
> >>> > >
> >>> > > Please look for the time around 12:30-12:32 to find the relevant
> >>> logs.
> >>> > Let
> >>> > > me know if you need some other information.
> >>> > >
> >>> > >
> >>> > > Regards,
> >>> > > -Sameer.
> >>> > >
> >>> >
> >>>
> >>>
> >>>
> >>> --
> >>> -- Guozhang
> >>>
> >>
> >>
> >
>
>
> --
> -- Guozhang
>

Re: Kafka 11 | Stream Application crashed the brokers

Posted by Guozhang Wang <wa...@gmail.com>.
Hi Sameer,

If no clients has transactions turned on the `__transaction_state` internal
topic would not be created at all. So I still suspect that some of your
clients (maybe not your Streams client, but your Producer client that is
sending data to the source topic?) has transactions turned on.

BTW from your logs I saw lots of the following errors on client side:

2017-09-11 12:42:34 ERROR RecordCollectorImpl:113 - task [0_6] Error
sending record to topic c-7-e6-KSTREAM-BRANCHCHILD-0000000007-repartition.
No more offsets will be recorded for this task and the exception will
eventually be thrown

org.apache.kafka.common.errors.*TimeoutException*: Expiring 13 record(s)
for c-7-e6-KSTREAM-BRANCHCHILD-0000000007-repartition-3: 31174 ms has
passed since last append

2017-09-11 12:42:36 WARN  Sender:511 - Got error produce response with
correlation id 82862 on topic-partition
c-7-e6-KSTREAM-JOINTHIS-0000000018-store-changelog-22, retrying (2147483646
attempts left). *Error: NETWORK_EXCEPTION*

2017-09-11 12:42:36 ERROR RecordCollectorImpl:113 - task [0_22] Error
sending record to topic c-7-e6-KSTREAM-BRANCHCHILD-0000000007-repartition.
No more offsets will be recorded for this task and the exception will
eventually be thrown

org.apache.kafka.common.errors.*TimeoutException*: Expiring 13 record(s)
for c-7-e6-KSTREAM-BRANCHCHILD-0000000007-repartition-3: 31467 ms has
passed since last append


Today if the TimeoutException is thrown from the recordCollector it will
cause the Streams to throw this exception all the way to the user exception
handler and then shutdown the thread. And this exception would be thrown if
the Kafka broker itself is not available (also from your previous logs it
seems broker 192 and 193 was unavailable and hence being kicked out by
broker 109 out of the IRS).


Guozhang



On Mon, Sep 11, 2017 at 3:40 AM, Sameer Kumar <sa...@gmail.com>
wrote:

> Hi Guozhang,
>
> Please find the relevant logs, see a folder for client logs as well,
> things started getting awry at 12:42:05.
> Let me know if you need any more information.
>
> -Sameer.
>
> On Sun, Sep 10, 2017 at 5:06 PM, Sameer Kumar <sa...@gmail.com>
> wrote:
>
>> Hi Guozhang,
>>
>> Nope, I was not using exactly-once mode. I dont have the client logs with
>> me right now, I will try to replicate it again and share the other details
>> with you.
>>
>> My concern was that it crashed my brokers as well.
>>
>> -Sameer.
>>
>> On Sat, Sep 9, 2017 at 1:51 AM, Guozhang Wang <wa...@gmail.com> wrote:
>>
>>> Hello Sameer,
>>>
>>> I looked through your code, and here is what I figured: in 0.11 version
>>> we
>>> added the exactly-once feature (
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+E
>>> xactly+Once+Delivery+and+Transactional+Messaging
>>> )
>>>
>>> Which uses the transaction log (internal topic named
>>> "__transaction_state")
>>> that has a default replication of 3 (that will overwrite your global
>>> config
>>> value of 2). Then at around 12:30, the leader of the transation log
>>> partition kicked both replicas of 190 and 192 out of the replica:
>>>
>>> [2017-09-05 12:30:31,256] INFO [GroupCoordinator 193]: Preparing to
>>> rebalance group KafkaCache_TEST15 with old generation 14
>>> (__consumer_offsets-27) (kafka.coordinator.group.GroupCoordinator)
>>>
>>> *[2017-09-05 12:30:41,510] INFO Partition [__transaction_state,9] on
>>> broker
>>> 193: Shrinking ISR from 193,192,190 to 193 (kafka.cluster.Partition)*
>>>
>>> *[2017-09-05 12:30:41,513] INFO Partition [__transaction_state,6] on
>>> broker
>>> 193: Shrinking ISR from 193,190,192 to 193 (kafka.cluster.Partition)*
>>>
>>> *[2017-09-05 12:30:41,514] INFO Partition [__transaction_state,3] on
>>> broker
>>> 193: Shrinking ISR from 193,192,190 to 193 (kafka.cluster.Partition)*
>>>
>>> *[2017-09-05 12:30:41,515] INFO Partition [__transaction_state,18] on
>>> broker 193: Shrinking ISR from 193,190,192 to 193
>>> (kafka.cluster.Partition)*
>>>
>>> *[2017-09-05 12:30:41,515] INFO Partition [__transaction_state,15] on
>>> broker 193: Shrinking ISR from 193,192,190 to 193
>>> (kafka.cluster.Partition)*
>>>
>>> *[2017-09-05 12:30:41,516] INFO Partition [__transaction_state,12] on
>>> broker 193: Shrinking ISR from 193,190,192 to 193
>>> (kafka.cluster.Partition)*
>>>
>>> *[2017-09-05 12:30:41,516] INFO Partition [__consumer_offsets,12] on
>>> broker
>>> 193: Shrinking ISR from 193,192,190 to 193 (kafka.cluster.Partition)*
>>>
>>> *[2017-09-05 12:30:41,517] INFO Partition [__consumer_offsets,15] on
>>> broker
>>> 193: Shrinking ISR from 193,192,190 to 193 (kafka.cluster.Partition)*
>>>
>>> *[2017-09-05 12:30:41,517] INFO Partition [__transaction_state,24] on
>>> broker 193: Shrinking ISR from 193,190,192 to 193
>>> (kafka.cluster.Partition)*
>>>
>>> At the mean time, both replicas of 190 and 192 seems to be timed out on
>>> their fetch requests (note the big timestamp gap in the logs):
>>>
>>> [2017-09-05 12:26:21,130] INFO Rolled new log segment for 'AdServe-4' in
>>> 1
>>> ms. (kafka.log.Log)
>>> [2017-09-05 12:30:59,046] WARN [ReplicaFetcherThread-2-193]: Error in
>>> fetch
>>> to broker 193, request (type=FetchRequest, replicaId=190, maxWait=500,
>>> minBytes=1, maxBytes=10485760, fetchData={__consumer_offsets-
>>> 21=(offset=0,
>>> logStartOffset=0, maxBytes=1048576)
>>>
>>> ...
>>>
>>> [2017-09-05 12:28:37,514] INFO Deleting index
>>> /data1/kafka/AdServe-5/00000000000405000294.timeindex.deleted
>>> (kafka.log.TimeIndex)
>>> [2017-09-05 12:30:59,042] WARN [ReplicaFetcherThread-2-193]: Error in
>>> fetch
>>> to broker 193, request (type=FetchRequest, replicaId=192, maxWait=500,
>>> minBytes=1, maxBytes=10485760, fetchData={__consumer_offsets-
>>> 21=(offset=0,
>>> logStartOffset=0, maxBytes=1048576)
>>>
>>>
>>>
>>> This caused the NotEnoughReplicasException since any appends to the
>>> transaction logs are required "acks=all, and min.isr=num.replicas".
>>>
>>> *[2017-09-05 12:32:11,612] ERROR [Replica Manager on Broker 193]: Error
>>> processing append operation on partition __transaction_state-18
>>> (kafka.server.ReplicaManager)*
>>>
>>> *org.apache.kafka.common.errors.NotEnoughReplicasException: Number of
>>> insync replicas for partition __transaction_state-18 is [1], below
>>> required
>>> minimum [3]*
>>>
>>> Upon seeing this error, the transaction coordinator should retry
>>> appending,
>>> but if the retry never succeeds it will be blocked. I did not see the
>>> Streams API client-side logs and so cannot tell for sure, why this caused
>>> the Streams app to fail as well. A quick question: did you enable
>>> `processing.mode=exactly-once` on your streams app?
>>>
>>>
>>> Guozhang
>>>
>>>
>>>
>>>
>>> On Fri, Sep 8, 2017 at 1:34 AM, Sameer Kumar <sa...@gmail.com>
>>> wrote:
>>>
>>> > Hi All,
>>> >
>>> >
>>> > Any thoughts on the below mail.
>>> >
>>> > -Sameer.
>>> >
>>> > On Wed, Sep 6, 2017 at 12:28 PM, Sameer Kumar <sa...@gmail.com>
>>> > wrote:
>>> >
>>> > > Hi All,
>>> > >
>>> > > I want to report a scenario wherein my running 2 different instances
>>> of
>>> > my
>>> > > stream application caused my brokers to crash and eventually my
>>> stream
>>> > > application as well. This scenario only happens when my brokers run
>>> on
>>> > > Kafka11, everything works fine if my brokers are on Kafka 10..2 and
>>> > stream
>>> > > application on Kafka11.
>>> > >
>>> > > I am attaching herewith the logs in a zipped format.
>>> > >
>>> > > The cluster configuration
>>> > > 3 nodes(190,192,193) , Kafka 11
>>> > > Topic Replication Factor - 2
>>> > >
>>> > > App configuration
>>> > > Kafka 11 streams.
>>> > >
>>> > >
>>> > > The error I saw on 193 server was org.apache.kafka.common.errors.
>>> > NotEnoughReplicasException:
>>> > > Number of insync replicas for partition __transaction_state-18 is
>>> [1],
>>> > > below required minimum [3]. Both 192,190 servers reported errors on
>>> > failure
>>> > > to read information from 193.
>>> > >
>>> > > Please look for the time around 12:30-12:32 to find the relevant
>>> logs.
>>> > Let
>>> > > me know if you need some other information.
>>> > >
>>> > >
>>> > > Regards,
>>> > > -Sameer.
>>> > >
>>> >
>>>
>>>
>>>
>>> --
>>> -- Guozhang
>>>
>>
>>
>


-- 
-- Guozhang

Re: Kafka 11 | Stream Application crashed the brokers

Posted by Sameer Kumar <sa...@gmail.com>.
Hi Guozhang,

Please find the relevant logs, see a folder for client logs as well, things
started getting awry at 12:42:05.
Let me know if you need any more information.

-Sameer.

On Sun, Sep 10, 2017 at 5:06 PM, Sameer Kumar <sa...@gmail.com>
wrote:

> Hi Guozhang,
>
> Nope, I was not using exactly-once mode. I dont have the client logs with
> me right now, I will try to replicate it again and share the other details
> with you.
>
> My concern was that it crashed my brokers as well.
>
> -Sameer.
>
> On Sat, Sep 9, 2017 at 1:51 AM, Guozhang Wang <wa...@gmail.com> wrote:
>
>> Hello Sameer,
>>
>> I looked through your code, and here is what I figured: in 0.11 version we
>> added the exactly-once feature (
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+
>> Exactly+Once+Delivery+and+Transactional+Messaging
>> )
>>
>> Which uses the transaction log (internal topic named
>> "__transaction_state")
>> that has a default replication of 3 (that will overwrite your global
>> config
>> value of 2). Then at around 12:30, the leader of the transation log
>> partition kicked both replicas of 190 and 192 out of the replica:
>>
>> [2017-09-05 12:30:31,256] INFO [GroupCoordinator 193]: Preparing to
>> rebalance group KafkaCache_TEST15 with old generation 14
>> (__consumer_offsets-27) (kafka.coordinator.group.GroupCoordinator)
>>
>> *[2017-09-05 12:30:41,510] INFO Partition [__transaction_state,9] on
>> broker
>> 193: Shrinking ISR from 193,192,190 to 193 (kafka.cluster.Partition)*
>>
>> *[2017-09-05 12:30:41,513] INFO Partition [__transaction_state,6] on
>> broker
>> 193: Shrinking ISR from 193,190,192 to 193 (kafka.cluster.Partition)*
>>
>> *[2017-09-05 12:30:41,514] INFO Partition [__transaction_state,3] on
>> broker
>> 193: Shrinking ISR from 193,192,190 to 193 (kafka.cluster.Partition)*
>>
>> *[2017-09-05 12:30:41,515] INFO Partition [__transaction_state,18] on
>> broker 193: Shrinking ISR from 193,190,192 to 193
>> (kafka.cluster.Partition)*
>>
>> *[2017-09-05 12:30:41,515] INFO Partition [__transaction_state,15] on
>> broker 193: Shrinking ISR from 193,192,190 to 193
>> (kafka.cluster.Partition)*
>>
>> *[2017-09-05 12:30:41,516] INFO Partition [__transaction_state,12] on
>> broker 193: Shrinking ISR from 193,190,192 to 193
>> (kafka.cluster.Partition)*
>>
>> *[2017-09-05 12:30:41,516] INFO Partition [__consumer_offsets,12] on
>> broker
>> 193: Shrinking ISR from 193,192,190 to 193 (kafka.cluster.Partition)*
>>
>> *[2017-09-05 12:30:41,517] INFO Partition [__consumer_offsets,15] on
>> broker
>> 193: Shrinking ISR from 193,192,190 to 193 (kafka.cluster.Partition)*
>>
>> *[2017-09-05 12:30:41,517] INFO Partition [__transaction_state,24] on
>> broker 193: Shrinking ISR from 193,190,192 to 193
>> (kafka.cluster.Partition)*
>>
>> At the mean time, both replicas of 190 and 192 seems to be timed out on
>> their fetch requests (note the big timestamp gap in the logs):
>>
>> [2017-09-05 12:26:21,130] INFO Rolled new log segment for 'AdServe-4' in 1
>> ms. (kafka.log.Log)
>> [2017-09-05 12:30:59,046] WARN [ReplicaFetcherThread-2-193]: Error in
>> fetch
>> to broker 193, request (type=FetchRequest, replicaId=190, maxWait=500,
>> minBytes=1, maxBytes=10485760, fetchData={__consumer_offsets-
>> 21=(offset=0,
>> logStartOffset=0, maxBytes=1048576)
>>
>> ...
>>
>> [2017-09-05 12:28:37,514] INFO Deleting index
>> /data1/kafka/AdServe-5/00000000000405000294.timeindex.deleted
>> (kafka.log.TimeIndex)
>> [2017-09-05 12:30:59,042] WARN [ReplicaFetcherThread-2-193]: Error in
>> fetch
>> to broker 193, request (type=FetchRequest, replicaId=192, maxWait=500,
>> minBytes=1, maxBytes=10485760, fetchData={__consumer_offsets-
>> 21=(offset=0,
>> logStartOffset=0, maxBytes=1048576)
>>
>>
>>
>> This caused the NotEnoughReplicasException since any appends to the
>> transaction logs are required "acks=all, and min.isr=num.replicas".
>>
>> *[2017-09-05 12:32:11,612] ERROR [Replica Manager on Broker 193]: Error
>> processing append operation on partition __transaction_state-18
>> (kafka.server.ReplicaManager)*
>>
>> *org.apache.kafka.common.errors.NotEnoughReplicasException: Number of
>> insync replicas for partition __transaction_state-18 is [1], below
>> required
>> minimum [3]*
>>
>> Upon seeing this error, the transaction coordinator should retry
>> appending,
>> but if the retry never succeeds it will be blocked. I did not see the
>> Streams API client-side logs and so cannot tell for sure, why this caused
>> the Streams app to fail as well. A quick question: did you enable
>> `processing.mode=exactly-once` on your streams app?
>>
>>
>> Guozhang
>>
>>
>>
>>
>> On Fri, Sep 8, 2017 at 1:34 AM, Sameer Kumar <sa...@gmail.com>
>> wrote:
>>
>> > Hi All,
>> >
>> >
>> > Any thoughts on the below mail.
>> >
>> > -Sameer.
>> >
>> > On Wed, Sep 6, 2017 at 12:28 PM, Sameer Kumar <sa...@gmail.com>
>> > wrote:
>> >
>> > > Hi All,
>> > >
>> > > I want to report a scenario wherein my running 2 different instances
>> of
>> > my
>> > > stream application caused my brokers to crash and eventually my stream
>> > > application as well. This scenario only happens when my brokers run on
>> > > Kafka11, everything works fine if my brokers are on Kafka 10..2 and
>> > stream
>> > > application on Kafka11.
>> > >
>> > > I am attaching herewith the logs in a zipped format.
>> > >
>> > > The cluster configuration
>> > > 3 nodes(190,192,193) , Kafka 11
>> > > Topic Replication Factor - 2
>> > >
>> > > App configuration
>> > > Kafka 11 streams.
>> > >
>> > >
>> > > The error I saw on 193 server was org.apache.kafka.common.errors.
>> > NotEnoughReplicasException:
>> > > Number of insync replicas for partition __transaction_state-18 is [1],
>> > > below required minimum [3]. Both 192,190 servers reported errors on
>> > failure
>> > > to read information from 193.
>> > >
>> > > Please look for the time around 12:30-12:32 to find the relevant logs.
>> > Let
>> > > me know if you need some other information.
>> > >
>> > >
>> > > Regards,
>> > > -Sameer.
>> > >
>> >
>>
>>
>>
>> --
>> -- Guozhang
>>
>
>

Re: Kafka 11 | Stream Application crashed the brokers

Posted by Sameer Kumar <sa...@gmail.com>.
Hi Guozhang,

Nope, I was not using exactly-once mode. I dont have the client logs with
me right now, I will try to replicate it again and share the other details
with you.

My concern was that it crashed my brokers as well.

-Sameer.

On Sat, Sep 9, 2017 at 1:51 AM, Guozhang Wang <wa...@gmail.com> wrote:

> Hello Sameer,
>
> I looked through your code, and here is what I figured: in 0.11 version we
> added the exactly-once feature (
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 98+-+Exactly+Once+Delivery+and+Transactional+Messaging
> )
>
> Which uses the transaction log (internal topic named "__transaction_state")
> that has a default replication of 3 (that will overwrite your global config
> value of 2). Then at around 12:30, the leader of the transation log
> partition kicked both replicas of 190 and 192 out of the replica:
>
> [2017-09-05 12:30:31,256] INFO [GroupCoordinator 193]: Preparing to
> rebalance group KafkaCache_TEST15 with old generation 14
> (__consumer_offsets-27) (kafka.coordinator.group.GroupCoordinator)
>
> *[2017-09-05 12:30:41,510] INFO Partition [__transaction_state,9] on broker
> 193: Shrinking ISR from 193,192,190 to 193 (kafka.cluster.Partition)*
>
> *[2017-09-05 12:30:41,513] INFO Partition [__transaction_state,6] on broker
> 193: Shrinking ISR from 193,190,192 to 193 (kafka.cluster.Partition)*
>
> *[2017-09-05 12:30:41,514] INFO Partition [__transaction_state,3] on broker
> 193: Shrinking ISR from 193,192,190 to 193 (kafka.cluster.Partition)*
>
> *[2017-09-05 12:30:41,515] INFO Partition [__transaction_state,18] on
> broker 193: Shrinking ISR from 193,190,192 to 193
> (kafka.cluster.Partition)*
>
> *[2017-09-05 12:30:41,515] INFO Partition [__transaction_state,15] on
> broker 193: Shrinking ISR from 193,192,190 to 193
> (kafka.cluster.Partition)*
>
> *[2017-09-05 12:30:41,516] INFO Partition [__transaction_state,12] on
> broker 193: Shrinking ISR from 193,190,192 to 193
> (kafka.cluster.Partition)*
>
> *[2017-09-05 12:30:41,516] INFO Partition [__consumer_offsets,12] on broker
> 193: Shrinking ISR from 193,192,190 to 193 (kafka.cluster.Partition)*
>
> *[2017-09-05 12:30:41,517] INFO Partition [__consumer_offsets,15] on broker
> 193: Shrinking ISR from 193,192,190 to 193 (kafka.cluster.Partition)*
>
> *[2017-09-05 12:30:41,517] INFO Partition [__transaction_state,24] on
> broker 193: Shrinking ISR from 193,190,192 to 193
> (kafka.cluster.Partition)*
>
> At the mean time, both replicas of 190 and 192 seems to be timed out on
> their fetch requests (note the big timestamp gap in the logs):
>
> [2017-09-05 12:26:21,130] INFO Rolled new log segment for 'AdServe-4' in 1
> ms. (kafka.log.Log)
> [2017-09-05 12:30:59,046] WARN [ReplicaFetcherThread-2-193]: Error in fetch
> to broker 193, request (type=FetchRequest, replicaId=190, maxWait=500,
> minBytes=1, maxBytes=10485760, fetchData={__consumer_offsets-21=(offset=0,
> logStartOffset=0, maxBytes=1048576)
>
> ...
>
> [2017-09-05 12:28:37,514] INFO Deleting index
> /data1/kafka/AdServe-5/00000000000405000294.timeindex.deleted
> (kafka.log.TimeIndex)
> [2017-09-05 12:30:59,042] WARN [ReplicaFetcherThread-2-193]: Error in fetch
> to broker 193, request (type=FetchRequest, replicaId=192, maxWait=500,
> minBytes=1, maxBytes=10485760, fetchData={__consumer_offsets-21=(offset=0,
> logStartOffset=0, maxBytes=1048576)
>
>
>
> This caused the NotEnoughReplicasException since any appends to the
> transaction logs are required "acks=all, and min.isr=num.replicas".
>
> *[2017-09-05 12:32:11,612] ERROR [Replica Manager on Broker 193]: Error
> processing append operation on partition __transaction_state-18
> (kafka.server.ReplicaManager)*
>
> *org.apache.kafka.common.errors.NotEnoughReplicasException: Number of
> insync replicas for partition __transaction_state-18 is [1], below required
> minimum [3]*
>
> Upon seeing this error, the transaction coordinator should retry appending,
> but if the retry never succeeds it will be blocked. I did not see the
> Streams API client-side logs and so cannot tell for sure, why this caused
> the Streams app to fail as well. A quick question: did you enable
> `processing.mode=exactly-once` on your streams app?
>
>
> Guozhang
>
>
>
>
> On Fri, Sep 8, 2017 at 1:34 AM, Sameer Kumar <sa...@gmail.com>
> wrote:
>
> > Hi All,
> >
> >
> > Any thoughts on the below mail.
> >
> > -Sameer.
> >
> > On Wed, Sep 6, 2017 at 12:28 PM, Sameer Kumar <sa...@gmail.com>
> > wrote:
> >
> > > Hi All,
> > >
> > > I want to report a scenario wherein my running 2 different instances of
> > my
> > > stream application caused my brokers to crash and eventually my stream
> > > application as well. This scenario only happens when my brokers run on
> > > Kafka11, everything works fine if my brokers are on Kafka 10..2 and
> > stream
> > > application on Kafka11.
> > >
> > > I am attaching herewith the logs in a zipped format.
> > >
> > > The cluster configuration
> > > 3 nodes(190,192,193) , Kafka 11
> > > Topic Replication Factor - 2
> > >
> > > App configuration
> > > Kafka 11 streams.
> > >
> > >
> > > The error I saw on 193 server was org.apache.kafka.common.errors.
> > NotEnoughReplicasException:
> > > Number of insync replicas for partition __transaction_state-18 is [1],
> > > below required minimum [3]. Both 192,190 servers reported errors on
> > failure
> > > to read information from 193.
> > >
> > > Please look for the time around 12:30-12:32 to find the relevant logs.
> > Let
> > > me know if you need some other information.
> > >
> > >
> > > Regards,
> > > -Sameer.
> > >
> >
>
>
>
> --
> -- Guozhang
>

Re: Kafka 11 | Stream Application crashed the brokers

Posted by Guozhang Wang <wa...@gmail.com>.
Hello Sameer,

I looked through your code, and here is what I figured: in 0.11 version we
added the exactly-once feature (
https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging
)

Which uses the transaction log (internal topic named "__transaction_state")
that has a default replication of 3 (that will overwrite your global config
value of 2). Then at around 12:30, the leader of the transation log
partition kicked both replicas of 190 and 192 out of the replica:

[2017-09-05 12:30:31,256] INFO [GroupCoordinator 193]: Preparing to
rebalance group KafkaCache_TEST15 with old generation 14
(__consumer_offsets-27) (kafka.coordinator.group.GroupCoordinator)

*[2017-09-05 12:30:41,510] INFO Partition [__transaction_state,9] on broker
193: Shrinking ISR from 193,192,190 to 193 (kafka.cluster.Partition)*

*[2017-09-05 12:30:41,513] INFO Partition [__transaction_state,6] on broker
193: Shrinking ISR from 193,190,192 to 193 (kafka.cluster.Partition)*

*[2017-09-05 12:30:41,514] INFO Partition [__transaction_state,3] on broker
193: Shrinking ISR from 193,192,190 to 193 (kafka.cluster.Partition)*

*[2017-09-05 12:30:41,515] INFO Partition [__transaction_state,18] on
broker 193: Shrinking ISR from 193,190,192 to 193 (kafka.cluster.Partition)*

*[2017-09-05 12:30:41,515] INFO Partition [__transaction_state,15] on
broker 193: Shrinking ISR from 193,192,190 to 193 (kafka.cluster.Partition)*

*[2017-09-05 12:30:41,516] INFO Partition [__transaction_state,12] on
broker 193: Shrinking ISR from 193,190,192 to 193 (kafka.cluster.Partition)*

*[2017-09-05 12:30:41,516] INFO Partition [__consumer_offsets,12] on broker
193: Shrinking ISR from 193,192,190 to 193 (kafka.cluster.Partition)*

*[2017-09-05 12:30:41,517] INFO Partition [__consumer_offsets,15] on broker
193: Shrinking ISR from 193,192,190 to 193 (kafka.cluster.Partition)*

*[2017-09-05 12:30:41,517] INFO Partition [__transaction_state,24] on
broker 193: Shrinking ISR from 193,190,192 to 193 (kafka.cluster.Partition)*

At the mean time, both replicas of 190 and 192 seems to be timed out on
their fetch requests (note the big timestamp gap in the logs):

[2017-09-05 12:26:21,130] INFO Rolled new log segment for 'AdServe-4' in 1
ms. (kafka.log.Log)
[2017-09-05 12:30:59,046] WARN [ReplicaFetcherThread-2-193]: Error in fetch
to broker 193, request (type=FetchRequest, replicaId=190, maxWait=500,
minBytes=1, maxBytes=10485760, fetchData={__consumer_offsets-21=(offset=0,
logStartOffset=0, maxBytes=1048576)

...

[2017-09-05 12:28:37,514] INFO Deleting index
/data1/kafka/AdServe-5/00000000000405000294.timeindex.deleted
(kafka.log.TimeIndex)
[2017-09-05 12:30:59,042] WARN [ReplicaFetcherThread-2-193]: Error in fetch
to broker 193, request (type=FetchRequest, replicaId=192, maxWait=500,
minBytes=1, maxBytes=10485760, fetchData={__consumer_offsets-21=(offset=0,
logStartOffset=0, maxBytes=1048576)



This caused the NotEnoughReplicasException since any appends to the
transaction logs are required "acks=all, and min.isr=num.replicas".

*[2017-09-05 12:32:11,612] ERROR [Replica Manager on Broker 193]: Error
processing append operation on partition __transaction_state-18
(kafka.server.ReplicaManager)*

*org.apache.kafka.common.errors.NotEnoughReplicasException: Number of
insync replicas for partition __transaction_state-18 is [1], below required
minimum [3]*

Upon seeing this error, the transaction coordinator should retry appending,
but if the retry never succeeds it will be blocked. I did not see the
Streams API client-side logs and so cannot tell for sure, why this caused
the Streams app to fail as well. A quick question: did you enable
`processing.mode=exactly-once` on your streams app?


Guozhang




On Fri, Sep 8, 2017 at 1:34 AM, Sameer Kumar <sa...@gmail.com> wrote:

> Hi All,
>
>
> Any thoughts on the below mail.
>
> -Sameer.
>
> On Wed, Sep 6, 2017 at 12:28 PM, Sameer Kumar <sa...@gmail.com>
> wrote:
>
> > Hi All,
> >
> > I want to report a scenario wherein my running 2 different instances of
> my
> > stream application caused my brokers to crash and eventually my stream
> > application as well. This scenario only happens when my brokers run on
> > Kafka11, everything works fine if my brokers are on Kafka 10..2 and
> stream
> > application on Kafka11.
> >
> > I am attaching herewith the logs in a zipped format.
> >
> > The cluster configuration
> > 3 nodes(190,192,193) , Kafka 11
> > Topic Replication Factor - 2
> >
> > App configuration
> > Kafka 11 streams.
> >
> >
> > The error I saw on 193 server was org.apache.kafka.common.errors.
> NotEnoughReplicasException:
> > Number of insync replicas for partition __transaction_state-18 is [1],
> > below required minimum [3]. Both 192,190 servers reported errors on
> failure
> > to read information from 193.
> >
> > Please look for the time around 12:30-12:32 to find the relevant logs.
> Let
> > me know if you need some other information.
> >
> >
> > Regards,
> > -Sameer.
> >
>



-- 
-- Guozhang

Re: Kafka 11 | Stream Application crashed the brokers

Posted by Sameer Kumar <sa...@gmail.com>.
Hi All,


Any thoughts on the below mail.

-Sameer.

On Wed, Sep 6, 2017 at 12:28 PM, Sameer Kumar <sa...@gmail.com>
wrote:

> Hi All,
>
> I want to report a scenario wherein my running 2 different instances of my
> stream application caused my brokers to crash and eventually my stream
> application as well. This scenario only happens when my brokers run on
> Kafka11, everything works fine if my brokers are on Kafka 10..2 and stream
> application on Kafka11.
>
> I am attaching herewith the logs in a zipped format.
>
> The cluster configuration
> 3 nodes(190,192,193) , Kafka 11
> Topic Replication Factor - 2
>
> App configuration
> Kafka 11 streams.
>
>
> The error I saw on 193 server was org.apache.kafka.common.errors.NotEnoughReplicasException:
> Number of insync replicas for partition __transaction_state-18 is [1],
> below required minimum [3]. Both 192,190 servers reported errors on failure
> to read information from 193.
>
> Please look for the time around 12:30-12:32 to find the relevant logs. Let
> me know if you need some other information.
>
>
> Regards,
> -Sameer.
>