You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Kafka <ka...@126.com> on 2016/09/22 10:56:18 UTC

Does Kafka 0.9 can guarantee not loss data

Hi all,	
	in terms of topic, we create a topic with 6 partition,and each with 3 replicas.
        in terms of producer,when we send message with ack -1 using sync interface.
	in terms of brokers,we set min.insync.replicas to 2.

after we review the kafka broker’s code,we know that we send a message to broker with ack -1, then we can get response if ISR of this partition is great than or equal to min.insync.replicas,but what confused
me is replicas in ISR is not strongly consistent,in kafka 0.9 we use replica.lag.time.max.ms param to judge whether to shrink ISR, and the defaults is 10000 ms, so replicas’ data in isr can lag 10000ms at most,
we we restart broker which own this partitions’ leader, then controller will start a new leader election, which will choose the first replica in ISR that not equals to current leader as new leader, then this will loss data.


The main produce handle code shows below:
val numAcks = curInSyncReplicas.count(r => {
          if (!r.isLocal)
            if (r.logEndOffset.messageOffset >= requiredOffset) {
              trace("Replica %d of %s-%d received offset %d".format(r.brokerId, topic, partitionId, requiredOffset))
              true
            }
            else
              false
          else
            true /* also count the local (leader) replica */
        })

        trace("%d acks satisfied for %s-%d with acks = -1".format(numAcks, topic, partitionId))

        val minIsr = leaderReplica.log.get.config.minInSyncReplicas

        if (leaderReplica.highWatermark.messageOffset >= requiredOffset ) {
          /*
          * The topic may be configured not to accept messages if there are not enough replicas in ISR
          * in this scenario the request was already appended locally and then added to the purgatory before the ISR was shrunk
          */
          if (minIsr <= curInSyncReplicas.size) {
            (true, ErrorMapping.NoError)
          } else {
            (true, ErrorMapping.NotEnoughReplicasAfterAppendCode)
          }
        } else
          (false, ErrorMapping.NoError)


why only logging unAcks and not use numAcks to compare with minIsr, if numAcks is 0, but curInSyncReplicas.size >= minIsr, then this will return, as ISR shrink procedure is not real time, does this will loss data after leader election?

Feedback is greatly appreciated. Thanks.
meituan.inf




Re: Does Kafka 0.9 can guarantee not loss data

Posted by Kafka <ka...@126.com>.
Oh please ignore my last reply.
I find if leaderReplica.highWatermark.messageOffset >= requiredOffset , this can ensure all replicas’ leo  in curInSyncReplicas is >=  the requiredOffset.

> 在 2016年9月23日,下午3:39,Kafka <ka...@126.com> 写道:
> 
> OK, the example before is not enough to exposure problem.
> What will happen to the situation under the numAcks is 1,and curInSyncReplica.size >= minIsr,but in fact the replica in curInSyncReplica only have one replica has caught up to leader,
> and this replica is the leader replica itself,this is not safe when the machine that deploys leader partition’s broker is restart. 
> 
> current code is as belows,
> if (minIsr <= curInSyncReplicas.size) {
>            (true, ErrorMapping.NoError)
>          } else {
>            (true, ErrorMapping.NotEnoughReplicasAfterAppendCode)
>          }
> 
> why not the code as belows,
> if (minIsr <= curInSyncReplicas.size && minIsr <= numAcks) {
>            (true, ErrorMapping.NoError)
>          } else {
>            (true, ErrorMapping.NotEnoughReplicasAfterAppendCode)
>          }
> 
> Its seems that only one condition in kafka broker’s code is not enough to ensure safe,because replicas in curInSyncReplicas is not Strong synchronization.
> 
>> 在 2016年9月23日,下午1:45,Becket Qin <be...@gmail.com> 写道:
>> 
>> In order to satisfy a produce response, there are two conditions:
>> A. The leader's high watermark should be higher than the requiredOffset
>> (max offset in that produce request of that partition)
>> B. The number of in sync replica is greater than min.isr.
>> 
>> The ultimate goal here is to make sure at least min.isr number of replicas
>> has caught up to requiredOffset. So the check is not only whether we have
>> enough number of replicas in the isr, but also whether those replicas in
>> the ISR has caught up to the required offset.
>> 
>> In your example, if numAcks is 0 and curInSyncReplica.size >= minIsr, the
>> produce response won't return if min.isr > 0, because
>> leaderReplica.highWatermark must be less than requiredOffset given the fact
>> that numAcks is 0. i.e. condition A is not met.
>> 
>> We are actually even doing a stronger than necessary check here.
>> Theoretically as long as min.isr number of replicas has caught up to
>> requiredOffset, we should be able to return the response, but we also
>> require those replicas to be in the ISR.
>> 
>> On Thu, Sep 22, 2016 at 8:15 PM, Kafka <ka...@126.com> wrote:
>> 
>>> @wangguozhang,could you give me some advices.
>>> 
>>>> 在 2016年9月22日,下午6:56,Kafka <ka...@126.com> 写道:
>>>> 
>>>> Hi all,
>>>>     in terms of topic, we create a topic with 6 partition,and each
>>> with 3 replicas.
>>>>      in terms of producer,when we send message with ack -1 using sync
>>> interface.
>>>>     in terms of brokers,we set min.insync.replicas to 2.
>>>> 
>>>> after we review the kafka broker’s code,we know that we send a message
>>> to broker with ack -1, then we can get response if ISR of this partition is
>>> great than or equal to min.insync.replicas,but what confused
>>>> me is replicas in ISR is not strongly consistent,in kafka 0.9 we use
>>> replica.lag.time.max.ms param to judge whether to shrink ISR, and the
>>> defaults is 10000 ms, so replicas’ data in isr can lag 10000ms at most,
>>>> we we restart broker which own this partitions’ leader, then controller
>>> will start a new leader election, which will choose the first replica in
>>> ISR that not equals to current leader as new leader, then this will loss
>>> data.
>>>> 
>>>> 
>>>> The main produce handle code shows below:
>>>> val numAcks = curInSyncReplicas.count(r => {
>>>>        if (!r.isLocal)
>>>>          if (r.logEndOffset.messageOffset >= requiredOffset) {
>>>>            trace("Replica %d of %s-%d received offset
>>> %d".format(r.brokerId, topic, partitionId, requiredOffset))
>>>>            true
>>>>          }
>>>>          else
>>>>            false
>>>>        else
>>>>          true /* also count the local (leader) replica */
>>>>      })
>>>> 
>>>>      trace("%d acks satisfied for %s-%d with acks =
>>> -1".format(numAcks, topic, partitionId))
>>>> 
>>>>      val minIsr = leaderReplica.log.get.config.minInSyncReplicas
>>>> 
>>>>      if (leaderReplica.highWatermark.messageOffset >= requiredOffset
>>> ) {
>>>>        /*
>>>>        * The topic may be configured not to accept messages if there
>>> are not enough replicas in ISR
>>>>        * in this scenario the request was already appended locally and
>>> then added to the purgatory before the ISR was shrunk
>>>>        */
>>>>        if (minIsr <= curInSyncReplicas.size) {
>>>>          (true, ErrorMapping.NoError)
>>>>        } else {
>>>>          (true, ErrorMapping.NotEnoughReplicasAfterAppendCode)
>>>>        }
>>>>      } else
>>>>        (false, ErrorMapping.NoError)
>>>> 
>>>> 
>>>> why only logging unAcks and not use numAcks to compare with minIsr, if
>>> numAcks is 0, but curInSyncReplicas.size >= minIsr, then this will return,
>>> as ISR shrink procedure is not real time, does this will loss data after
>>> leader election?
>>>> 
>>>> Feedback is greatly appreciated. Thanks.
>>>> meituan.inf
>>>> 
>>>> 
>>>> 
>>> 
>>> 
>>> 
> 
> 



Re: Does Kafka 0.9 can guarantee not loss data

Posted by Kafka <ka...@126.com>.
Oh please ignore my last reply.
I find if leaderReplica.highWatermark.messageOffset >= requiredOffset , this can ensure all replicas’ leo  in curInSyncReplicas is >=  the requiredOffset.

> 在 2016年9月23日,下午3:39,Kafka <ka...@126.com> 写道:
> 
> OK, the example before is not enough to exposure problem.
> What will happen to the situation under the numAcks is 1,and curInSyncReplica.size >= minIsr,but in fact the replica in curInSyncReplica only have one replica has caught up to leader,
> and this replica is the leader replica itself,this is not safe when the machine that deploys leader partition’s broker is restart. 
> 
> current code is as belows,
> if (minIsr <= curInSyncReplicas.size) {
>            (true, ErrorMapping.NoError)
>          } else {
>            (true, ErrorMapping.NotEnoughReplicasAfterAppendCode)
>          }
> 
> why not the code as belows,
> if (minIsr <= curInSyncReplicas.size && minIsr <= numAcks) {
>            (true, ErrorMapping.NoError)
>          } else {
>            (true, ErrorMapping.NotEnoughReplicasAfterAppendCode)
>          }
> 
> Its seems that only one condition in kafka broker’s code is not enough to ensure safe,because replicas in curInSyncReplicas is not Strong synchronization.
> 
>> 在 2016年9月23日,下午1:45,Becket Qin <be...@gmail.com> 写道:
>> 
>> In order to satisfy a produce response, there are two conditions:
>> A. The leader's high watermark should be higher than the requiredOffset
>> (max offset in that produce request of that partition)
>> B. The number of in sync replica is greater than min.isr.
>> 
>> The ultimate goal here is to make sure at least min.isr number of replicas
>> has caught up to requiredOffset. So the check is not only whether we have
>> enough number of replicas in the isr, but also whether those replicas in
>> the ISR has caught up to the required offset.
>> 
>> In your example, if numAcks is 0 and curInSyncReplica.size >= minIsr, the
>> produce response won't return if min.isr > 0, because
>> leaderReplica.highWatermark must be less than requiredOffset given the fact
>> that numAcks is 0. i.e. condition A is not met.
>> 
>> We are actually even doing a stronger than necessary check here.
>> Theoretically as long as min.isr number of replicas has caught up to
>> requiredOffset, we should be able to return the response, but we also
>> require those replicas to be in the ISR.
>> 
>> On Thu, Sep 22, 2016 at 8:15 PM, Kafka <ka...@126.com> wrote:
>> 
>>> @wangguozhang,could you give me some advices.
>>> 
>>>> 在 2016年9月22日,下午6:56,Kafka <ka...@126.com> 写道:
>>>> 
>>>> Hi all,
>>>>     in terms of topic, we create a topic with 6 partition,and each
>>> with 3 replicas.
>>>>      in terms of producer,when we send message with ack -1 using sync
>>> interface.
>>>>     in terms of brokers,we set min.insync.replicas to 2.
>>>> 
>>>> after we review the kafka broker’s code,we know that we send a message
>>> to broker with ack -1, then we can get response if ISR of this partition is
>>> great than or equal to min.insync.replicas,but what confused
>>>> me is replicas in ISR is not strongly consistent,in kafka 0.9 we use
>>> replica.lag.time.max.ms param to judge whether to shrink ISR, and the
>>> defaults is 10000 ms, so replicas’ data in isr can lag 10000ms at most,
>>>> we we restart broker which own this partitions’ leader, then controller
>>> will start a new leader election, which will choose the first replica in
>>> ISR that not equals to current leader as new leader, then this will loss
>>> data.
>>>> 
>>>> 
>>>> The main produce handle code shows below:
>>>> val numAcks = curInSyncReplicas.count(r => {
>>>>        if (!r.isLocal)
>>>>          if (r.logEndOffset.messageOffset >= requiredOffset) {
>>>>            trace("Replica %d of %s-%d received offset
>>> %d".format(r.brokerId, topic, partitionId, requiredOffset))
>>>>            true
>>>>          }
>>>>          else
>>>>            false
>>>>        else
>>>>          true /* also count the local (leader) replica */
>>>>      })
>>>> 
>>>>      trace("%d acks satisfied for %s-%d with acks =
>>> -1".format(numAcks, topic, partitionId))
>>>> 
>>>>      val minIsr = leaderReplica.log.get.config.minInSyncReplicas
>>>> 
>>>>      if (leaderReplica.highWatermark.messageOffset >= requiredOffset
>>> ) {
>>>>        /*
>>>>        * The topic may be configured not to accept messages if there
>>> are not enough replicas in ISR
>>>>        * in this scenario the request was already appended locally and
>>> then added to the purgatory before the ISR was shrunk
>>>>        */
>>>>        if (minIsr <= curInSyncReplicas.size) {
>>>>          (true, ErrorMapping.NoError)
>>>>        } else {
>>>>          (true, ErrorMapping.NotEnoughReplicasAfterAppendCode)
>>>>        }
>>>>      } else
>>>>        (false, ErrorMapping.NoError)
>>>> 
>>>> 
>>>> why only logging unAcks and not use numAcks to compare with minIsr, if
>>> numAcks is 0, but curInSyncReplicas.size >= minIsr, then this will return,
>>> as ISR shrink procedure is not real time, does this will loss data after
>>> leader election?
>>>> 
>>>> Feedback is greatly appreciated. Thanks.
>>>> meituan.inf
>>>> 
>>>> 
>>>> 
>>> 
>>> 
>>> 
> 
> 



Re: Does Kafka 0.9 can guarantee not loss data

Posted by Kafka <ka...@126.com>.
OK, the example before is not enough to exposure problem.
What will happen to the situation under the numAcks is 1,and curInSyncReplica.size >= minIsr,but in fact the replica in curInSyncReplica only have one replica has caught up to leader,
and this replica is the leader replica itself,this is not safe when the machine that deploys leader partition’s broker is restart. 

current code is as belows,
if (minIsr <= curInSyncReplicas.size) {
            (true, ErrorMapping.NoError)
          } else {
            (true, ErrorMapping.NotEnoughReplicasAfterAppendCode)
          }

why not the code as belows,
if (minIsr <= curInSyncReplicas.size && minIsr <= numAcks) {
            (true, ErrorMapping.NoError)
          } else {
            (true, ErrorMapping.NotEnoughReplicasAfterAppendCode)
          }

Its seems that only one condition in kafka broker’s code is not enough to ensure safe,because replicas in curInSyncReplicas is not Strong synchronization.

> 在 2016年9月23日,下午1:45,Becket Qin <be...@gmail.com> 写道:
> 
> In order to satisfy a produce response, there are two conditions:
> A. The leader's high watermark should be higher than the requiredOffset
> (max offset in that produce request of that partition)
> B. The number of in sync replica is greater than min.isr.
> 
> The ultimate goal here is to make sure at least min.isr number of replicas
> has caught up to requiredOffset. So the check is not only whether we have
> enough number of replicas in the isr, but also whether those replicas in
> the ISR has caught up to the required offset.
> 
> In your example, if numAcks is 0 and curInSyncReplica.size >= minIsr, the
> produce response won't return if min.isr > 0, because
> leaderReplica.highWatermark must be less than requiredOffset given the fact
> that numAcks is 0. i.e. condition A is not met.
> 
> We are actually even doing a stronger than necessary check here.
> Theoretically as long as min.isr number of replicas has caught up to
> requiredOffset, we should be able to return the response, but we also
> require those replicas to be in the ISR.
> 
> On Thu, Sep 22, 2016 at 8:15 PM, Kafka <ka...@126.com> wrote:
> 
>> @wangguozhang,could you give me some advices.
>> 
>>> 在 2016年9月22日,下午6:56,Kafka <ka...@126.com> 写道:
>>> 
>>> Hi all,
>>>      in terms of topic, we create a topic with 6 partition,and each
>> with 3 replicas.
>>>       in terms of producer,when we send message with ack -1 using sync
>> interface.
>>>      in terms of brokers,we set min.insync.replicas to 2.
>>> 
>>> after we review the kafka broker’s code,we know that we send a message
>> to broker with ack -1, then we can get response if ISR of this partition is
>> great than or equal to min.insync.replicas,but what confused
>>> me is replicas in ISR is not strongly consistent,in kafka 0.9 we use
>> replica.lag.time.max.ms param to judge whether to shrink ISR, and the
>> defaults is 10000 ms, so replicas’ data in isr can lag 10000ms at most,
>>> we we restart broker which own this partitions’ leader, then controller
>> will start a new leader election, which will choose the first replica in
>> ISR that not equals to current leader as new leader, then this will loss
>> data.
>>> 
>>> 
>>> The main produce handle code shows below:
>>> val numAcks = curInSyncReplicas.count(r => {
>>>         if (!r.isLocal)
>>>           if (r.logEndOffset.messageOffset >= requiredOffset) {
>>>             trace("Replica %d of %s-%d received offset
>> %d".format(r.brokerId, topic, partitionId, requiredOffset))
>>>             true
>>>           }
>>>           else
>>>             false
>>>         else
>>>           true /* also count the local (leader) replica */
>>>       })
>>> 
>>>       trace("%d acks satisfied for %s-%d with acks =
>> -1".format(numAcks, topic, partitionId))
>>> 
>>>       val minIsr = leaderReplica.log.get.config.minInSyncReplicas
>>> 
>>>       if (leaderReplica.highWatermark.messageOffset >= requiredOffset
>> ) {
>>>         /*
>>>         * The topic may be configured not to accept messages if there
>> are not enough replicas in ISR
>>>         * in this scenario the request was already appended locally and
>> then added to the purgatory before the ISR was shrunk
>>>         */
>>>         if (minIsr <= curInSyncReplicas.size) {
>>>           (true, ErrorMapping.NoError)
>>>         } else {
>>>           (true, ErrorMapping.NotEnoughReplicasAfterAppendCode)
>>>         }
>>>       } else
>>>         (false, ErrorMapping.NoError)
>>> 
>>> 
>>> why only logging unAcks and not use numAcks to compare with minIsr, if
>> numAcks is 0, but curInSyncReplicas.size >= minIsr, then this will return,
>> as ISR shrink procedure is not real time, does this will loss data after
>> leader election?
>>> 
>>> Feedback is greatly appreciated. Thanks.
>>> meituan.inf
>>> 
>>> 
>>> 
>> 
>> 
>> 



Re: Does Kafka 0.9 can guarantee not loss data

Posted by Kafka <ka...@126.com>.
OK, the example before is not enough to exposure problem.
What will happen to the situation under the numAcks is 1,and curInSyncReplica.size >= minIsr,but in fact the replica in curInSyncReplica only have one replica has caught up to leader,
and this replica is the leader replica itself,this is not safe when the machine that deploys leader partition’s broker is restart. 

current code is as belows,
if (minIsr <= curInSyncReplicas.size) {
            (true, ErrorMapping.NoError)
          } else {
            (true, ErrorMapping.NotEnoughReplicasAfterAppendCode)
          }

why not the code as belows,
if (minIsr <= curInSyncReplicas.size && minIsr <= numAcks) {
            (true, ErrorMapping.NoError)
          } else {
            (true, ErrorMapping.NotEnoughReplicasAfterAppendCode)
          }

Its seems that only one condition in kafka broker’s code is not enough to ensure safe,because replicas in curInSyncReplicas is not Strong synchronization.

> 在 2016年9月23日,下午1:45,Becket Qin <be...@gmail.com> 写道:
> 
> In order to satisfy a produce response, there are two conditions:
> A. The leader's high watermark should be higher than the requiredOffset
> (max offset in that produce request of that partition)
> B. The number of in sync replica is greater than min.isr.
> 
> The ultimate goal here is to make sure at least min.isr number of replicas
> has caught up to requiredOffset. So the check is not only whether we have
> enough number of replicas in the isr, but also whether those replicas in
> the ISR has caught up to the required offset.
> 
> In your example, if numAcks is 0 and curInSyncReplica.size >= minIsr, the
> produce response won't return if min.isr > 0, because
> leaderReplica.highWatermark must be less than requiredOffset given the fact
> that numAcks is 0. i.e. condition A is not met.
> 
> We are actually even doing a stronger than necessary check here.
> Theoretically as long as min.isr number of replicas has caught up to
> requiredOffset, we should be able to return the response, but we also
> require those replicas to be in the ISR.
> 
> On Thu, Sep 22, 2016 at 8:15 PM, Kafka <ka...@126.com> wrote:
> 
>> @wangguozhang,could you give me some advices.
>> 
>>> 在 2016年9月22日,下午6:56,Kafka <ka...@126.com> 写道:
>>> 
>>> Hi all,
>>>      in terms of topic, we create a topic with 6 partition,and each
>> with 3 replicas.
>>>       in terms of producer,when we send message with ack -1 using sync
>> interface.
>>>      in terms of brokers,we set min.insync.replicas to 2.
>>> 
>>> after we review the kafka broker’s code,we know that we send a message
>> to broker with ack -1, then we can get response if ISR of this partition is
>> great than or equal to min.insync.replicas,but what confused
>>> me is replicas in ISR is not strongly consistent,in kafka 0.9 we use
>> replica.lag.time.max.ms param to judge whether to shrink ISR, and the
>> defaults is 10000 ms, so replicas’ data in isr can lag 10000ms at most,
>>> we we restart broker which own this partitions’ leader, then controller
>> will start a new leader election, which will choose the first replica in
>> ISR that not equals to current leader as new leader, then this will loss
>> data.
>>> 
>>> 
>>> The main produce handle code shows below:
>>> val numAcks = curInSyncReplicas.count(r => {
>>>         if (!r.isLocal)
>>>           if (r.logEndOffset.messageOffset >= requiredOffset) {
>>>             trace("Replica %d of %s-%d received offset
>> %d".format(r.brokerId, topic, partitionId, requiredOffset))
>>>             true
>>>           }
>>>           else
>>>             false
>>>         else
>>>           true /* also count the local (leader) replica */
>>>       })
>>> 
>>>       trace("%d acks satisfied for %s-%d with acks =
>> -1".format(numAcks, topic, partitionId))
>>> 
>>>       val minIsr = leaderReplica.log.get.config.minInSyncReplicas
>>> 
>>>       if (leaderReplica.highWatermark.messageOffset >= requiredOffset
>> ) {
>>>         /*
>>>         * The topic may be configured not to accept messages if there
>> are not enough replicas in ISR
>>>         * in this scenario the request was already appended locally and
>> then added to the purgatory before the ISR was shrunk
>>>         */
>>>         if (minIsr <= curInSyncReplicas.size) {
>>>           (true, ErrorMapping.NoError)
>>>         } else {
>>>           (true, ErrorMapping.NotEnoughReplicasAfterAppendCode)
>>>         }
>>>       } else
>>>         (false, ErrorMapping.NoError)
>>> 
>>> 
>>> why only logging unAcks and not use numAcks to compare with minIsr, if
>> numAcks is 0, but curInSyncReplicas.size >= minIsr, then this will return,
>> as ISR shrink procedure is not real time, does this will loss data after
>> leader election?
>>> 
>>> Feedback is greatly appreciated. Thanks.
>>> meituan.inf
>>> 
>>> 
>>> 
>> 
>> 
>> 



Re: Does Kafka 0.9 can guarantee not loss data

Posted by Becket Qin <be...@gmail.com>.
In order to satisfy a produce response, there are two conditions:
A. The leader's high watermark should be higher than the requiredOffset
(max offset in that produce request of that partition)
B. The number of in sync replica is greater than min.isr.

The ultimate goal here is to make sure at least min.isr number of replicas
has caught up to requiredOffset. So the check is not only whether we have
enough number of replicas in the isr, but also whether those replicas in
the ISR has caught up to the required offset.

In your example, if numAcks is 0 and curInSyncReplica.size >= minIsr, the
produce response won't return if min.isr > 0, because
leaderReplica.highWatermark must be less than requiredOffset given the fact
that numAcks is 0. i.e. condition A is not met.

We are actually even doing a stronger than necessary check here.
Theoretically as long as min.isr number of replicas has caught up to
requiredOffset, we should be able to return the response, but we also
require those replicas to be in the ISR.

On Thu, Sep 22, 2016 at 8:15 PM, Kafka <ka...@126.com> wrote:

> @wangguozhang,could you give me some advices.
>
> > 在 2016年9月22日,下午6:56,Kafka <ka...@126.com> 写道:
> >
> > Hi all,
> >       in terms of topic, we create a topic with 6 partition,and each
> with 3 replicas.
> >        in terms of producer,when we send message with ack -1 using sync
> interface.
> >       in terms of brokers,we set min.insync.replicas to 2.
> >
> > after we review the kafka broker’s code,we know that we send a message
> to broker with ack -1, then we can get response if ISR of this partition is
> great than or equal to min.insync.replicas,but what confused
> > me is replicas in ISR is not strongly consistent,in kafka 0.9 we use
> replica.lag.time.max.ms param to judge whether to shrink ISR, and the
> defaults is 10000 ms, so replicas’ data in isr can lag 10000ms at most,
> > we we restart broker which own this partitions’ leader, then controller
> will start a new leader election, which will choose the first replica in
> ISR that not equals to current leader as new leader, then this will loss
> data.
> >
> >
> > The main produce handle code shows below:
> > val numAcks = curInSyncReplicas.count(r => {
> >          if (!r.isLocal)
> >            if (r.logEndOffset.messageOffset >= requiredOffset) {
> >              trace("Replica %d of %s-%d received offset
> %d".format(r.brokerId, topic, partitionId, requiredOffset))
> >              true
> >            }
> >            else
> >              false
> >          else
> >            true /* also count the local (leader) replica */
> >        })
> >
> >        trace("%d acks satisfied for %s-%d with acks =
> -1".format(numAcks, topic, partitionId))
> >
> >        val minIsr = leaderReplica.log.get.config.minInSyncReplicas
> >
> >        if (leaderReplica.highWatermark.messageOffset >= requiredOffset
> ) {
> >          /*
> >          * The topic may be configured not to accept messages if there
> are not enough replicas in ISR
> >          * in this scenario the request was already appended locally and
> then added to the purgatory before the ISR was shrunk
> >          */
> >          if (minIsr <= curInSyncReplicas.size) {
> >            (true, ErrorMapping.NoError)
> >          } else {
> >            (true, ErrorMapping.NotEnoughReplicasAfterAppendCode)
> >          }
> >        } else
> >          (false, ErrorMapping.NoError)
> >
> >
> > why only logging unAcks and not use numAcks to compare with minIsr, if
> numAcks is 0, but curInSyncReplicas.size >= minIsr, then this will return,
> as ISR shrink procedure is not real time, does this will loss data after
> leader election?
> >
> > Feedback is greatly appreciated. Thanks.
> > meituan.inf
> >
> >
> >
>
>
>

Re: Does Kafka 0.9 can guarantee not loss data

Posted by Becket Qin <be...@gmail.com>.
In order to satisfy a produce response, there are two conditions:
A. The leader's high watermark should be higher than the requiredOffset
(max offset in that produce request of that partition)
B. The number of in sync replica is greater than min.isr.

The ultimate goal here is to make sure at least min.isr number of replicas
has caught up to requiredOffset. So the check is not only whether we have
enough number of replicas in the isr, but also whether those replicas in
the ISR has caught up to the required offset.

In your example, if numAcks is 0 and curInSyncReplica.size >= minIsr, the
produce response won't return if min.isr > 0, because
leaderReplica.highWatermark must be less than requiredOffset given the fact
that numAcks is 0. i.e. condition A is not met.

We are actually even doing a stronger than necessary check here.
Theoretically as long as min.isr number of replicas has caught up to
requiredOffset, we should be able to return the response, but we also
require those replicas to be in the ISR.

On Thu, Sep 22, 2016 at 8:15 PM, Kafka <ka...@126.com> wrote:

> @wangguozhang,could you give me some advices.
>
> > 在 2016年9月22日,下午6:56,Kafka <ka...@126.com> 写道:
> >
> > Hi all,
> >       in terms of topic, we create a topic with 6 partition,and each
> with 3 replicas.
> >        in terms of producer,when we send message with ack -1 using sync
> interface.
> >       in terms of brokers,we set min.insync.replicas to 2.
> >
> > after we review the kafka broker’s code,we know that we send a message
> to broker with ack -1, then we can get response if ISR of this partition is
> great than or equal to min.insync.replicas,but what confused
> > me is replicas in ISR is not strongly consistent,in kafka 0.9 we use
> replica.lag.time.max.ms param to judge whether to shrink ISR, and the
> defaults is 10000 ms, so replicas’ data in isr can lag 10000ms at most,
> > we we restart broker which own this partitions’ leader, then controller
> will start a new leader election, which will choose the first replica in
> ISR that not equals to current leader as new leader, then this will loss
> data.
> >
> >
> > The main produce handle code shows below:
> > val numAcks = curInSyncReplicas.count(r => {
> >          if (!r.isLocal)
> >            if (r.logEndOffset.messageOffset >= requiredOffset) {
> >              trace("Replica %d of %s-%d received offset
> %d".format(r.brokerId, topic, partitionId, requiredOffset))
> >              true
> >            }
> >            else
> >              false
> >          else
> >            true /* also count the local (leader) replica */
> >        })
> >
> >        trace("%d acks satisfied for %s-%d with acks =
> -1".format(numAcks, topic, partitionId))
> >
> >        val minIsr = leaderReplica.log.get.config.minInSyncReplicas
> >
> >        if (leaderReplica.highWatermark.messageOffset >= requiredOffset
> ) {
> >          /*
> >          * The topic may be configured not to accept messages if there
> are not enough replicas in ISR
> >          * in this scenario the request was already appended locally and
> then added to the purgatory before the ISR was shrunk
> >          */
> >          if (minIsr <= curInSyncReplicas.size) {
> >            (true, ErrorMapping.NoError)
> >          } else {
> >            (true, ErrorMapping.NotEnoughReplicasAfterAppendCode)
> >          }
> >        } else
> >          (false, ErrorMapping.NoError)
> >
> >
> > why only logging unAcks and not use numAcks to compare with minIsr, if
> numAcks is 0, but curInSyncReplicas.size >= minIsr, then this will return,
> as ISR shrink procedure is not real time, does this will loss data after
> leader election?
> >
> > Feedback is greatly appreciated. Thanks.
> > meituan.inf
> >
> >
> >
>
>
>

Re: Does Kafka 0.9 can guarantee not loss data

Posted by Kafka <ka...@126.com>.
@wangguozhang,could you give me some advices.

> 在 2016年9月22日,下午6:56,Kafka <ka...@126.com> 写道:
> 
> Hi all,	
> 	in terms of topic, we create a topic with 6 partition,and each with 3 replicas.
>        in terms of producer,when we send message with ack -1 using sync interface.
> 	in terms of brokers,we set min.insync.replicas to 2.
> 
> after we review the kafka broker’s code,we know that we send a message to broker with ack -1, then we can get response if ISR of this partition is great than or equal to min.insync.replicas,but what confused
> me is replicas in ISR is not strongly consistent,in kafka 0.9 we use replica.lag.time.max.ms param to judge whether to shrink ISR, and the defaults is 10000 ms, so replicas’ data in isr can lag 10000ms at most,
> we we restart broker which own this partitions’ leader, then controller will start a new leader election, which will choose the first replica in ISR that not equals to current leader as new leader, then this will loss data.
> 
> 
> The main produce handle code shows below:
> val numAcks = curInSyncReplicas.count(r => {
>          if (!r.isLocal)
>            if (r.logEndOffset.messageOffset >= requiredOffset) {
>              trace("Replica %d of %s-%d received offset %d".format(r.brokerId, topic, partitionId, requiredOffset))
>              true
>            }
>            else
>              false
>          else
>            true /* also count the local (leader) replica */
>        })
> 
>        trace("%d acks satisfied for %s-%d with acks = -1".format(numAcks, topic, partitionId))
> 
>        val minIsr = leaderReplica.log.get.config.minInSyncReplicas
> 
>        if (leaderReplica.highWatermark.messageOffset >= requiredOffset ) {
>          /*
>          * The topic may be configured not to accept messages if there are not enough replicas in ISR
>          * in this scenario the request was already appended locally and then added to the purgatory before the ISR was shrunk
>          */
>          if (minIsr <= curInSyncReplicas.size) {
>            (true, ErrorMapping.NoError)
>          } else {
>            (true, ErrorMapping.NotEnoughReplicasAfterAppendCode)
>          }
>        } else
>          (false, ErrorMapping.NoError)
> 
> 
> why only logging unAcks and not use numAcks to compare with minIsr, if numAcks is 0, but curInSyncReplicas.size >= minIsr, then this will return, as ISR shrink procedure is not real time, does this will loss data after leader election?
> 
> Feedback is greatly appreciated. Thanks.
> meituan.inf
> 
> 
> 



Re: Does Kafka 0.9 can guarantee not loss data

Posted by Kafka <ka...@126.com>.
@wangguozhang,could you give me some advices.

> 在 2016年9月22日,下午6:56,Kafka <ka...@126.com> 写道:
> 
> Hi all,	
> 	in terms of topic, we create a topic with 6 partition,and each with 3 replicas.
>        in terms of producer,when we send message with ack -1 using sync interface.
> 	in terms of brokers,we set min.insync.replicas to 2.
> 
> after we review the kafka broker’s code,we know that we send a message to broker with ack -1, then we can get response if ISR of this partition is great than or equal to min.insync.replicas,but what confused
> me is replicas in ISR is not strongly consistent,in kafka 0.9 we use replica.lag.time.max.ms param to judge whether to shrink ISR, and the defaults is 10000 ms, so replicas’ data in isr can lag 10000ms at most,
> we we restart broker which own this partitions’ leader, then controller will start a new leader election, which will choose the first replica in ISR that not equals to current leader as new leader, then this will loss data.
> 
> 
> The main produce handle code shows below:
> val numAcks = curInSyncReplicas.count(r => {
>          if (!r.isLocal)
>            if (r.logEndOffset.messageOffset >= requiredOffset) {
>              trace("Replica %d of %s-%d received offset %d".format(r.brokerId, topic, partitionId, requiredOffset))
>              true
>            }
>            else
>              false
>          else
>            true /* also count the local (leader) replica */
>        })
> 
>        trace("%d acks satisfied for %s-%d with acks = -1".format(numAcks, topic, partitionId))
> 
>        val minIsr = leaderReplica.log.get.config.minInSyncReplicas
> 
>        if (leaderReplica.highWatermark.messageOffset >= requiredOffset ) {
>          /*
>          * The topic may be configured not to accept messages if there are not enough replicas in ISR
>          * in this scenario the request was already appended locally and then added to the purgatory before the ISR was shrunk
>          */
>          if (minIsr <= curInSyncReplicas.size) {
>            (true, ErrorMapping.NoError)
>          } else {
>            (true, ErrorMapping.NotEnoughReplicasAfterAppendCode)
>          }
>        } else
>          (false, ErrorMapping.NoError)
> 
> 
> why only logging unAcks and not use numAcks to compare with minIsr, if numAcks is 0, but curInSyncReplicas.size >= minIsr, then this will return, as ISR shrink procedure is not real time, does this will loss data after leader election?
> 
> Feedback is greatly appreciated. Thanks.
> meituan.inf
> 
> 
>