You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Jun Rao <ju...@gmail.com> on 2013/07/01 06:32:42 UTC

Re: produce request failed: due to Leader not local for partition

Commented on the jira.

Thanks,

Jun


On Sat, Jun 29, 2013 at 6:21 AM, Jason Rosenberg <jb...@squareup.com> wrote:

> I added this scenario to KAFKA-955.
>
> I'm thinking that this scenario could be a problem for ack=0 in general
> (even without controlled shutdown).  If we do an "uncontrolled" shutdown,
> it seems that some topics won't ever know there could have been a leader
> change.  Would it make sense to force a meta-data refresh for all topics on
> a broker, any time an IOException happens on a socket (e.g. "connection
> reset")?  Currently, it looks like only the topic that experiences the
> failure will have a metadata refresh issued for it.
>
> Maybe this should be a separate jira issue, now that I think about it.
>
> Jason
>
>
> On Mon, Jun 24, 2013 at 10:52 PM, Jason Rosenberg <jb...@squareup.com>
> wrote:
>
> > Also, looking back at my logs, I'm wondering if a producer will reuse the
> > same socket to send data to the same broker, for multiple topics (I'm
> > guessing yes).  In which case, it looks like I'm seeing this scenario:
> >
> > 1. producer1 is happily sending messages for topicX and topicY to serverA
> > (serverA is the leader for both topics, only 1 partition for each topic
> for
> > simplicity).
> > 2. serverA is restarted, and in the process, serverB becomes the new
> > leader for both topicX and topicY.
> > 3. producer1 decides to send a new message to topicX to serverA.
> > 3a. this results in an exception ("Connection reset by peer").
> >  producer1's connection to serverA is invalidated.
> > 3b. producer1 makes a new metadata request for topicX, and learns that
> > serverB is now the leader for topicX.
> > 3c. producer1 resends the message to topicX, on serverB.
> > 4. producer1 decides to send a new message to topicY to serverA.
> > 4a. producer1 notes that it's socket to serverA is invalid, so it creates
> > a new connection to serverA.
> > 4b. producer1 successfully sends it's message to serverA (without
> > realizing that serverA is no longer the leader for topicY).
> > 4c. serverA logs to it's console:
> > 2013-06-23 08:28:46,770  WARN [kafka-request-handler-2] server.KafkaApis
> -
> > [KafkaApi-508818741] Produce request with correlation id 7136261 from
> > client  on partition [mytopic,0] failed due to Leader not local for
> > partition [mytopic,0] on broker 508818741
> > 5. producer1 continues to send messages for topicY to serverA, and
> serverA
> > continues to log the same messages.
> > 6. 10 minutes later, producer1 decides to update it's metadata for
> topicY,
> > and learns that serverB is now the leader for topidY.
> > 7. the warning messages finally stop in the console for serverA.
> >
> > I am pretty sure this scenario, or one very close to it, is what I'm
> > seeing in my logs, after doing a rolling restart, with controlled
> shutdown.
> >
> > Does this scenario make sense?
> >
> > One thing I notice, is that in the steady state, every 10 minutes the
> > producer refreshes it's metadata for all topics.  However, when sending a
> > message to a specific topic fails, only the metadata for that topic is
> > refreshed, even though the ramifications should be that all topics which
> > have the same leader might need to be refreshed, especially in response
> to
> > a "connection reset by peer".
> >
> > Jason
> >
> >
> >
> > On Mon, Jun 24, 2013 at 10:14 PM, Jason Rosenberg <jbr@squareup.com
> >wrote:
> >
> >> Jun,
> >>
> >> To be clear, this whole discussion was started, because I am clearly
> >> seeing "failed due to Leader not local" on the last broker restarted,
> >> after all the controlled shutting down has completed and all brokers
> >> restarted.
> >>
> >> This leads me to believe that a client made a meta data request and
> found
> >> out that server A was the leader for it's partition, and then server A
> was
> >> restarted, and then the client makes repeated producer requests to
> server
> >> A, without encountering a broken socket.  Thus, I'm not sure it's
> correct
> >> that the socket is invalidated in that case after a restart.
> >>
> >> Alternatively, could it be that the client (which sends messages to
> >> multiple topics), gets metadata updates for multiple topics, but doesn't
> >> attempt to send a message to topicX until after the leader has changed
> and
> >> server A has been restarted.  In this case, if it's the first time the
> >> producer sends to topicX, does it only then create a new socket?
> >>
> >> Jason
> >>
> >>
> >> On Mon, Jun 24, 2013 at 10:00 PM, Jun Rao <ju...@gmail.com> wrote:
> >>
> >>> That should be fine since the old socket in the producer will no longer
> >>> be
> >>> usable after a broker is restarted.
> >>>
> >>> Thanks,
> >>>
> >>> Jun
> >>>
> >>>
> >>> On Mon, Jun 24, 2013 at 9:50 PM, Jason Rosenberg <jb...@squareup.com>
> >>> wrote:
> >>>
> >>> > What about a non-controlled shutdown, and a restart, but the producer
> >>> never
> >>> > attempts to send anything during the time the broker was down?  That
> >>> could
> >>> > have caused a leader change, but without the producer knowing to
> >>> refresh
> >>> > it's metadata, no?
> >>> >
> >>> >
> >>> > On Mon, Jun 24, 2013 at 9:05 PM, Jun Rao <ju...@gmail.com> wrote:
> >>> >
> >>> > > Other than controlled shutdown, the only other case that can cause
> >>> the
> >>> > > leader to change when the underlying broker is alive is when the
> >>> broker
> >>> > > expires its ZK session (likely due to GC), which should be rare.
> That
> >>> > being
> >>> > > said, forwarding in the broker may not be a bad idea. Could you
> file
> >>> a
> >>> > jira
> >>> > > to track this?
> >>> > >
> >>> > > Thanks,
> >>> > >
> >>> > > Jun
> >>> > >
> >>> > >
> >>> > > On Mon, Jun 24, 2013 at 2:50 PM, Jason Rosenberg <jbr@squareup.com
> >
> >>> > wrote:
> >>> > >
> >>> > > > Yeah,
> >>> > > >
> >>> > > > I see that with ack=0, the producer will be in a bad state
> anytime
> >>> the
> >>> > > > leader for it's partition has changed, while the broker that it
> >>> thinks
> >>> > is
> >>> > > > the leader is still up.  So this is a problem in general, not
> only
> >>> for
> >>> > > > controlled shutdown, but even for the case where you've
> restarted a
> >>> > > server
> >>> > > > (without controlled shutdown), which in and of itself can force a
> >>> > leader
> >>> > > > change.  If the producer doesn't attempt to send a message during
> >>> the
> >>> > > time
> >>> > > > the broker was down, it will never get a connection failure, and
> >>> never
> >>> > > get
> >>> > > > fresh metadata, and subsequently start sending messages to the
> >>> > > non-leader.
> >>> > > >
> >>> > > > Thus, I'd say this is a problem with ack=0, regardless of
> >>> controlled
> >>> > > > shutdown.  Any time there's a leader change, the producer will
> send
> >>> > > > messages into the ether.  I think this is actually a severe
> >>> condition,
> >>> > > that
> >>> > > > could be considered a bug.  How hard would it be to have the
> >>> receiving
> >>> > > > broker forward on to the leader, in this case?
> >>> > > >
> >>> > > > Jason
> >>> > > >
> >>> > > >
> >>> > > > On Mon, Jun 24, 2013 at 8:44 AM, Joel Koshy <jjkoshy.w@gmail.com
> >
> >>> > wrote:
> >>> > > >
> >>> > > > > I think Jason was suggesting quiescent time as a possibility
> >>> only if
> >>> > > the
> >>> > > > > broker did request forwarding if it is not the leader.
> >>> > > > >
> >>> > > > > On Monday, June 24, 2013, Jun Rao wrote:
> >>> > > > >
> >>> > > > > > Jason,
> >>> > > > > >
> >>> > > > > > The quiescence time that you proposed won't work. The reason
> is
> >>> > that
> >>> > > > with
> >>> > > > > > ack=0, the producer starts losing data silently from the
> >>> moment the
> >>> > > > > leader
> >>> > > > > > is moved (by controlled shutdown) until the broker is shut
> >>> down.
> >>> > So,
> >>> > > > the
> >>> > > > > > sooner that you can shut down the broker, the better. What we
> >>> > > realized
> >>> > > > is
> >>> > > > > > that if you can use a larger batch size, ack=1 can still
> >>> deliver
> >>> > very
> >>> > > > > good
> >>> > > > > > throughput.
> >>> > > > > >
> >>> > > > > > Thanks,
> >>> > > > > >
> >>> > > > > > Jun
> >>> > > > > >
> >>> > > > > >
> >>> > > > > > On Mon, Jun 24, 2013 at 12:22 AM, Jason Rosenberg <
> >>> > jbr@squareup.com
> >>> > > > > <javascript:;>>
> >>> > > > > > wrote:
> >>> > > > > >
> >>> > > > > > > Yeah I am using ack = 0, so that makes sense.  I'll need to
> >>> > rethink
> >>> > > > > that,
> >>> > > > > > > it would seem.  It would be nice, wouldn't it, in this
> case,
> >>> for
> >>> > > the
> >>> > > > > > broker
> >>> > > > > > > to realize this and just forward the messages to the
> correct
> >>> > > leader.
> >>> > > > > >  Would
> >>> > > > > > > that be possible?
> >>> > > > > > >
> >>> > > > > > > Also, it would be nice to have a second option to the
> >>> controlled
> >>> > > > > shutdown
> >>> > > > > > > (e.g. controlled.shutdown.quiescence.ms), to allow the
> >>> broker to
> >>> > > > wait
> >>> > > > > > > after
> >>> > > > > > > the controlled shutdown, a prescribed amount of time before
> >>> > > actually
> >>> > > > > > > shutting down the server. Then, I could set this value to
> >>> > > something a
> >>> > > > > > > little greater than the producer's '
> >>> > > > topic.metadata.refresh.interval.ms
> >>> > > > > '.
> >>> > > > > > >  This would help with hitless rolling restarts too.
> >>>  Currently,
> >>> > > every
> >>> > > > > > > producer gets a very loud "Connection Reset" with a tall
> >>> stack
> >>> > > trace
> >>> > > > > each
> >>> > > > > > > time I restart a broker.  Would be nicer to have the
> >>> producers
> >>> > > still
> >>> > > > be
> >>> > > > > > > able to produce until the metadata refresh interval
> expires,
> >>> then
> >>> > > get
> >>> > > > > the
> >>> > > > > > > word that the leader has moved due to the controlled
> >>> shutdown,
> >>> > and
> >>> > > > then
> >>> > > > > > > start producing to the new leader, all before the shutting
> >>> down
> >>> > > > server
> >>> > > > > > > actually shuts down.  Does that seem feasible?
> >>> > > > > > >
> >>> > > > > > > Jason
> >>> > > > > > >
> >>> > > > > > >
> >>> > > > > > > On Sun, Jun 23, 2013 at 8:23 PM, Jun Rao <junrao@gmail.com
> >>> > > > > <javascript:;>>
> >>> > > > > > wrote:
> >>> > > > > > >
> >>> > > > > > > > Jason,
> >>> > > > > > > >
> >>> > > > > > > > Are you using ack = 0 in the producer? This mode doesn't
> >>> work
> >>> > > well
> >>> > > > > with
> >>> > > > > > > > controlled shutdown (this is explained in FAQ i*n
> >>> > > > > > > >
> >>> > > > >
> >>> >
> https://cwiki.apache.org/confluence/display/KAFKA/Replication+tools#)*
> >>> > > > > > > > *
> >>> > > > > > > > *
> >>> > > > > > > > Thanks,
> >>> > > > > > > >
> >>> > > > > > > > Jun
> >>> > > > > > > >
> >>> > > > > > > >
> >>> > > > > > > > On Sun, Jun 23, 2013 at 1:45 AM, Jason Rosenberg <
> >>> > > jbr@squareup.com
> >>> > > > > <javascript:;>
> >>> > > > > > >
> >>> > > > > > > wrote:
> >>> > > > > > > >
> >>> > > > > > > > > I'm working on trying on having seamless rolling
> >>> restarts for
> >>> > > my
> >>> > > > > > kafka
> >>> > > > > > > > > servers, running 0.8.  I have it so that each server
> >>> will be
> >>> > > > > > restarted
> >>> > > > > > > > > sequentially.  Each server takes itself out of the load
> >>> > > balancer
> >>> > > > > > (e.g.
> >>> > > > > > > > sets
> >>> > > > > > > > > a status that the lb will recognize, and then waits
> more
> >>> than
> >>> > > > long
> >>> > > > > > > enough
> >>> > > > > > > > > for the lb to stop sending meta-data requests to that
> >>> > server).
> >>> > > > >  Then
> >>> > > > > > I
> >>> > > > > > > > > initiate the shutdown (with
> >>> controlled.shutdown.enable=true).
> >>> > > >  This
> >>> > > > > > > seems
> >>> > > > > > > > > to work well, however, I occasionally see warnings like
> >>> this
> >>> > in
> >>> > > > the
> >>> > > > > > log
> >>> > > > > > > > > from the server, after restart:
> >>> > > > > > > > >
> >>> > > > > > > > > 2013-06-23 08:28:46,770  WARN [kafka-request-handler-2]
> >>> > > > > > > server.KafkaApis
> >>> > > > > > > > -
> >>> > > > > > > > > [KafkaApi-508818741] Produce request with correlation
> id
> >>> > > 7136261
> >>> > > > > from
> >>> > > > > > > > > client  on partition [mytopic,0] failed due to Leader
> not
> >>> > local
> >>> > > > for
> >>> > > > > > > > > partition [mytopic,0] on broker 508818741
> >>> > > > > > > > >
> >>> > > > > > > > > This WARN seems to persistently repeat, until the
> >>> producer
> >>> > > client
> >>> > > > > > > > initiates
> >>> > > > > > > > > a new meta-data request (e.g. every 10 minutes, by
> >>> default).
> >>> > > > > >  However,
> >>> > > > > > > > the
> >>> > > > > > > > > producer doesn't log any errors/exceptions when the
> >>> server is
> >>> > > > > logging
> >>> > > > > > > > this
> >>> > > > > > > > > WARN.
> >>> > > > > > > > >
> >>> > > > > > > > > What's happening here?  Is the message silently being
> >>> > forwarded
> >>> > > > on
> >>> > > > > to
> >>> > > > > > > the
> >>> > > > > > > > > correct leader for the partition?  Is the message
> >>> dropped?
> >>> >  Are
> >>> > > > > these
> >>> > > > > > > > WARNS
> >>> > > > > > > > > particularly useful?
> >>> > > > > > > > >
> >>> > > > > > > > > Thanks,
> >>> > > > > > > > >
> >>> > > > > > > > > Jason
> >>> > > > > > > > >
> >>> > > > > > > >
> >>> > > > > > >
> >>> > > > > >
> >>> > > > >
> >>> > > >
> >>> > >
> >>> >
> >>>
> >>
> >>
> >
>