You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Jun Rao (JIRA)" <ji...@apache.org> on 2012/05/30 22:38:23 UTC

[jira] [Created] (KAFKA-353) tie producer-side ack with high watermark and progress of replicas

Jun Rao created KAFKA-353:
-----------------------------

             Summary: tie producer-side ack with high watermark and progress of replicas
                 Key: KAFKA-353
                 URL: https://issues.apache.org/jira/browse/KAFKA-353
             Project: Kafka
          Issue Type: Sub-task
    Affects Versions: 0.8
            Reporter: Jun Rao




--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (KAFKA-353) tie producer-side ack with high watermark and progress of replicas

Posted by "Jun Rao (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/KAFKA-353?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13401447#comment-13401447 ] 

Jun Rao commented on KAFKA-353:
-------------------------------

3.4 I overlooked that watcher.collectSatisfiedRequests is synchronized. So, this is not an issue.
                
> tie producer-side ack with high watermark and progress of replicas
> ------------------------------------------------------------------
>
>                 Key: KAFKA-353
>                 URL: https://issues.apache.org/jira/browse/KAFKA-353
>             Project: Kafka
>          Issue Type: Sub-task
>    Affects Versions: 0.8
>            Reporter: Jun Rao
>            Assignee: Joel Koshy
>         Attachments: kafka-353_v1.patch
>
>


--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (KAFKA-353) tie producer-side ack with high watermark and progress of replicas

Posted by "Joel Koshy (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/KAFKA-353?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13404165#comment-13404165 ] 

Joel Koshy commented on KAFKA-353:
----------------------------------

Added that comment on requiredAcks < 0.

That's a good point about inSyncReplicas. Looks like this is used incorrectly from other places, so I would prefer doing that (and the additional synchronization for expire/checkSatisfied) in a separate jira if that's okay with others.

                
> tie producer-side ack with high watermark and progress of replicas
> ------------------------------------------------------------------
>
>                 Key: KAFKA-353
>                 URL: https://issues.apache.org/jira/browse/KAFKA-353
>             Project: Kafka
>          Issue Type: Sub-task
>    Affects Versions: 0.8
>            Reporter: Jun Rao
>            Assignee: Joel Koshy
>         Attachments: kafka-353_v1.patch, kafka-353_v2.patch, kafka-353_v2_to_v3.patch, kafka-353_v3.patch
>
>


--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (KAFKA-353) tie producer-side ack with high watermark and progress of replicas

Posted by "Jay Kreps (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/KAFKA-353?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13402802#comment-13402802 ] 

Jay Kreps commented on KAFKA-353:
---------------------------------

The guard is checked before the call to expire. The invariant here is everyone must do a test-and-set on the satisfied variable and only proceed with handling the request if it is satisfied. I don't think there is any problem with checkSatisfied being called on an item that is going to be expired it is only a problem if we try to process the request twice (the call to check satisfied or expire will end up being redundant work but that is not a major problem).
                
> tie producer-side ack with high watermark and progress of replicas
> ------------------------------------------------------------------
>
>                 Key: KAFKA-353
>                 URL: https://issues.apache.org/jira/browse/KAFKA-353
>             Project: Kafka
>          Issue Type: Sub-task
>    Affects Versions: 0.8
>            Reporter: Jun Rao
>            Assignee: Joel Koshy
>         Attachments: kafka-353_v1.patch, kafka-353_v2.patch
>
>


--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (KAFKA-353) tie producer-side ack with high watermark and progress of replicas

Posted by "Jay Kreps (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/KAFKA-353?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13401069#comment-13401069 ] 

Jay Kreps commented on KAFKA-353:
---------------------------------

Will respond to other points, but the intention of the timeout was that it was milliseconds. It is only 4 bytes because an infinite timeout can be specified by -1 and Int.MaxValue is already a lot of milliseconds.
                
> tie producer-side ack with high watermark and progress of replicas
> ------------------------------------------------------------------
>
>                 Key: KAFKA-353
>                 URL: https://issues.apache.org/jira/browse/KAFKA-353
>             Project: Kafka
>          Issue Type: Sub-task
>    Affects Versions: 0.8
>            Reporter: Jun Rao
>            Assignee: Joel Koshy
>         Attachments: kafka-353_v1.patch
>
>


--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (KAFKA-353) tie producer-side ack with high watermark and progress of replicas

Posted by "Jun Rao (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/KAFKA-353?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13286050#comment-13286050 ] 

Jun Rao commented on KAFKA-353:
-------------------------------

We need to support different levels of acks required by the producer. By default (ack=-1), the response to the producer request is only sent when every replica in ISR has received the message. If a producer specifies an ack >=0 (0 will be treated the same as 1), the response is sent after ack replicas have received the message. The caveat is that if the specified ack is less than ISR, the produced message could be lost if some broker fails.
                
> tie producer-side ack with high watermark and progress of replicas
> ------------------------------------------------------------------
>
>                 Key: KAFKA-353
>                 URL: https://issues.apache.org/jira/browse/KAFKA-353
>             Project: Kafka
>          Issue Type: Sub-task
>    Affects Versions: 0.8
>            Reporter: Jun Rao
>


--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Assigned] (KAFKA-353) tie producer-side ack with high watermark and progress of replicas

Posted by "Joel Koshy (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/KAFKA-353?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Joel Koshy reassigned KAFKA-353:
--------------------------------

    Assignee: Joel Koshy
    
> tie producer-side ack with high watermark and progress of replicas
> ------------------------------------------------------------------
>
>                 Key: KAFKA-353
>                 URL: https://issues.apache.org/jira/browse/KAFKA-353
>             Project: Kafka
>          Issue Type: Sub-task
>    Affects Versions: 0.8
>            Reporter: Jun Rao
>            Assignee: Joel Koshy
>


--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (KAFKA-353) tie producer-side ack with high watermark and progress of replicas

Posted by "Jay Kreps (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/KAFKA-353?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13400690#comment-13400690 ] 

Jay Kreps commented on KAFKA-353:
---------------------------------

Also, another thought. This implementation is keeping a Map[(topic, partition)] => # acks. I wonder if it would be better to do this in terms of the acknowledged hw. Basically keep a global structure as part of replica manager that tracks the position of all replicas for all partitions. Then the isSatisfied topic would just check this structure to see if the replicas had advanced far enough. This seems like it would not require a per-request map of state and would be less fragile. Somehow this structure seems generally useful for monitoring and other purposes as well.
                
> tie producer-side ack with high watermark and progress of replicas
> ------------------------------------------------------------------
>
>                 Key: KAFKA-353
>                 URL: https://issues.apache.org/jira/browse/KAFKA-353
>             Project: Kafka
>          Issue Type: Sub-task
>    Affects Versions: 0.8
>            Reporter: Jun Rao
>            Assignee: Joel Koshy
>         Attachments: kafka-353_v1.patch
>
>


--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (KAFKA-353) tie producer-side ack with high watermark and progress of replicas

Posted by "Joel Koshy (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/KAFKA-353?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13400956#comment-13400956 ] 

Joel Koshy commented on KAFKA-353:
----------------------------------

Jun/Jay, thanks a lot for the very helpful reviews. Here are follow-ups to
some of your comments - just wanted to make sure I fully address them in v2,
so let me know if I missed anything. One major thing to decide is what to do
on a leader change during a delayedProduce - should it be handled
immediately or let the full request timeout?

> 1. TopicData.partitionDatas: data is the plural form of datum. So datas
> feels weird. How about partitionDataArray? 

That works - no wonder 'datas' sounded odd. :)

> 3. DelayedProduce: 
> 3.1 isSatisfied(): need to handle the case when requiredAcks is default
> (probably any value <0). This means whenever we get the ack from every
> replica in the current ISR, the request is satisfied. This can be done by
> simply making sure leader.HW >=
> delayedProduce.partitionStatus(followerFetchPartition).localOffset. 

Thanks for pointing that out - forgot to handle < 0.

> 3.4 We need to be aware that isSatisfied can be called concurrently on the
> same DelayedProduce object. I am not sure if it's really necessary, but
> maybe we should consider using AtomicBoolean for
> PartitionStatus.acksPending? 

I don't think isSatisfied can be called concurrently on the same
DelayedProduce - the ProduceRequestPurgatory's watcher map is from key ->
Watchers. The isSatisfied method is synchronized (through
collectSatisfiedRequests) on Watchers. I do access partitionStatus for
respond(), but that's after the delayedProduce has been kicked out of the
purgatory. Or am I missing some other scenario? If so, can you elaborate?

> 4. ProducerRequest: actTimeoutSecs should be actTimeoutMs.

Forgot to bring this up in the v1 patch - it was simply named ackTimeout and
treated as seconds (i.e., the producer request object uses an Int instead of
Long) - that's why I added the secs suffix and left it as Int.  Will switch
it to use long and ms instead.

> 5. ReplicaManager: I would vote for not checking ProducerRequestPurgatory
> when there is a leadership change, since the logic is simpler. We just let
> the producer requests timeout. If we do want to do the check here, we
> should share the ProducerRequestPurgatory object used in KafkaApis. 

Ok - that's also fine, and the code comment indicated it was optional. Not
doing the check should help significantly with the code structure. Right now
there is a circular dependency which forced me to put DelayedProduce and
ProduceRequestPurgatory outside KafkaApis and pass in KafkaApis to
DelayedProduce - can probably lay it out differently if we want to do this
check later, but for now this makes things easier.

> 6. SyncProducerConfigShared: We should probably make the default
> requiredAcks to be -1 (i.e., wait for ack from each replica in ISR). 

Ok - either way sounds good. The existing default is (almost) the same
behavior as 0.7 except that the producer response is sent after the message
is persisted at the leader.

> 1. There is a race condition between adding to the purgatory and
> acknowledgement, I think. We first produce locally and then add the
> watcher, which allows a window where the message is available for
> replication with no watcher on ack. If a replica acknowledges a given
> produce before the DelayedProduce is added as a watcher that will not be
> recorded, I think, and the request will potentially timeout waiting for
> ISR acks. This would certainly happen and lead to sporadic timeouts. This
> is tricky, let's discuss. One solution would be to reverse the logic--add
> the watcher first then produce it locally, but this means having to handle
> the case where the local produce fails (we don't want to leak watchers). 

> The race condition that Jay raised seems like a potential issue. However,
> it may be tricky to add the watcher before writing to the local log in the
> leader. This is because the watcher needs to know the end offset of
> messages in the leader's log. Another way to address the race condition is
> to still add messages to leader's log first, followed by adding the
> watcher. However, after the watcher is added, we can explicitly trigger a
> watcher check by calling ProducerRequestPurgatory.update.

Excellent points - will think about how best to handle this.

> 2. As you say, you definitely can't pass KafkaApis to anything, making
> DelayedProduceReuqest an inner class of KafkaApis will fix this, I think. 

Right - that's actually what I started out with (i.e., the purgatory and
delayed request inside kafkaApis), but as I described above, there is a
circular dependency and I was forced to move it out. However, the conclusion
seems to be that we can do away with purgatory update on becoming follower
so we can break this dependency loop.

> 3. I think the way we are tying the api handling and replica manager is
> not good. replica manager shouldn't know about DelayedProduce requests if
> possible, it should only know about replica management and be oblivous to
> the api level. 

Covered above (2).

> 4. Why is the ack timeout in seconds? It should be milliseconds like
> everything else, right? One would commonly want a sub-second timeout. 

Yes - it should be ms. (described above)

> 5. As a stylistic thing it would be good to avoid direct use of
> Tuple2[A,B] and instead use (A,B) (e.g. myMethod: List[(A,B)] instead of
> myMethod: List[Tuple2[A,B]] 

Sounds good.

> 6. I don't understand why we are passing (topic, partition) to the
> purgatory as BOTH the key and request. This is a bit hacky since this is
> not the request. Let's discuss, we may need to clean up the purgatory api
> to make it gracefully handle this case. 

It does seem a bit unintuitive on the surface, but I think it is correct:
i.e., the produce request is to a set of (topic, partition) and potentially
unblocks DelayedFetch requests to those (topic, partition).

> 7. I think the ProducerRequestPurgatory should be instantiated in
> KafkaApis not KafkaServer--it is an implementation detail of the api layer
> not a major subsystem. 

Covered above (2).

> 8. The doubly nested map/flatMap in KafkaApis.handleProduceRequest is a
> little bit tricky could you clean that up or if that is not possible
> comment what it does (I think it just makes a list of (topic, partition)
> pairs, but the variable name doesn't explain it). Same with
> KafkaApis.handleFetchRequest. 

That is correct - will rename/comment.

# 9. DelayedProduce should definitely not be in the kafka.api package, that
# package is for request/response "structs". DelayedProduce is basically a
# bunch of server internals and not something the producer or consumer
# should be aware of. Likewise, the code for
# ProduceRequestPurgatory/FetchRequestPurgatory is kept differently, but
# these things are mirror images of one another. FetchRequestPurgatory is in
# KafkaApis, ProduceRequestPurgatory in its own file. These should match
# each other. The reasoning behind keeping things in KafkaApis was that
# those classes contained part of the logic for processing a request, so
# splitting it into two files makes it harder to read (the counter argument
# is that KafkaApis is getting pretty big). The other reason was so it could
# access the variables in KafkaApis as a nested class. Either way is
# probably fine, but they should be symetric between the two. 

Covered above (2).

> 11. I am not so sure about the logic of proactively responding to all
> outstanding requests with failure during a leadership change. Is this the
> right thing to do? 

Related to (2) above - we can discuss this more. The benefit of doing this
is that a DelayedProduce can be unblocked sooner (if the timeout is high)
and has a more accurate error code - i.e., the produce request failed for
this partition because its leader changed. However, not doing anything and
throwing a timeout is also fine - we just need to decide on one or the other
and document it for the producer API.

> 12. The logic in DelayedProduce.isSatisfied is very odd. I think what we
> want to do is respond to all outstanding requests in the event of a
> leadership change. But we do this by passing in the ReplicaManager and
> having logic that keys off whether or not the local broker is the leader.
> This seems quite convoluted. Wouldn't it make more sense to do this: (1)
> Have the replica manager allow "subscribers" to its state changes. (2) Add
> a subscriber that loops over all in-flight requests and expires them with
> an error when the local broker ceases to be the leader. There is a
> description of this pattern here http://codingjunkie.net/guava-eventbus/. 

I think there are two concerns you raise here:

(i) Responding to requests in the event of a leadership change: This was
intended as a safety net - the existing patch deals with the leadership
change in replicaManager (which is why replicaManager needs access to the
produceRequestPurgatory). However, if we choose not to handle the leader
change and just timeout, then we don't need that code in replicaManager and
we can remove this logic as well.

(ii) Passing in replicaManager - this is required because I need to get the
inSyncReplicas for a partition, and the replicaManager maintains that.
However, as described in (2) I think this can be directly accessed when I
move this back to KafkaApis.

> 13. Nice catch with the request size. I think it is okay that we are using
> sizeInBytes, since it is not expected that the producer send fragments,
> though a fully correct implementation would check that.

That's a good point - using sizeInBytes is (more) correct then.

> Also, another thought. This implementation is keeping a Map[(topic,
> partition)] => # acks. I wonder if it would be better to do this in terms
> of the acknowledged hw. Basically keep a global structure as part of
> replica manager that tracks the position of all replicas for all
> partitions. Then the isSatisfied topic would just check this structure to
> see if the replicas had advanced far enough. This seems like it would not
> require a per-request map of state and would be less fragile. Somehow this
> structure seems generally useful for monitoring and other purposes as
> well.

This is exactly how it is implemented now - let me know if I misunderstood
your comment. The acksPending is a boolean. The actual ack count is
determined from the acknowledged HW that the replicaManager keeps track of.
The map is still required to maintain other information such as the error
code (e.g., if there was an error in writing to the log), the offset for
each partition that followers need to fetch beyond for this request to be
unblocked, and if there are any acks pending (so we don't bother re-counting
unnecessarily).

                
> tie producer-side ack with high watermark and progress of replicas
> ------------------------------------------------------------------
>
>                 Key: KAFKA-353
>                 URL: https://issues.apache.org/jira/browse/KAFKA-353
>             Project: Kafka
>          Issue Type: Sub-task
>    Affects Versions: 0.8
>            Reporter: Jun Rao
>            Assignee: Joel Koshy
>         Attachments: kafka-353_v1.patch
>
>


--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (KAFKA-353) tie producer-side ack with high watermark and progress of replicas

Posted by "Jun Rao (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/KAFKA-353?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13403221#comment-13403221 ] 

Jun Rao commented on KAFKA-353:
-------------------------------

Thanks for patch v2. A couple of more comments:

21. DelayedProduce.isSatisfied(): The following check seems to be wrong. If  produce.requiredAcks < 0, numAcks is always going to be >=  produce.requiredAcks.
              if (numAcks >= produce.requiredAcks ||
                (numAcks >= isr.size && produce.requiredAcks < 0)) {

22. AsyncProducerTest seems to keep failing for me.
                
> tie producer-side ack with high watermark and progress of replicas
> ------------------------------------------------------------------
>
>                 Key: KAFKA-353
>                 URL: https://issues.apache.org/jira/browse/KAFKA-353
>             Project: Kafka
>          Issue Type: Sub-task
>    Affects Versions: 0.8
>            Reporter: Jun Rao
>            Assignee: Joel Koshy
>         Attachments: kafka-353_v1.patch, kafka-353_v2.patch
>
>


--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Updated] (KAFKA-353) tie producer-side ack with high watermark and progress of replicas

Posted by "Joel Koshy (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/KAFKA-353?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Joel Koshy updated KAFKA-353:
-----------------------------

    Attachment: kafka-353_v3.patch

Thanks for catch-21 :) Fixed that - (note that requiredAcks cannot be 0 in isSatisfied.)

For 22, turned out to be due to inconsistent default values for the producer config. Test passes now.

Re: Jay's comment. It is true that the requests that collectSatisfied returns will not go through expiration (by virtue of the satisfied guard). The issue is that the client needs to be aware that the call to expire and checkSatisfied can be simultaneous and so may need to synchronize on data structures that their implementations may access. So we can either synchronize on the purgatory or clearly document this. Personally, I'm leaning toward fixing it in purgatory because:
- if checkSatisfied is called (and it will return true), it seems we should consider the deadline to have been met (although this is not clear cut) so expiration should not proceed.
- it makes the purgatory a little harder to use as the client needs to be aware of the race condition.

                
> tie producer-side ack with high watermark and progress of replicas
> ------------------------------------------------------------------
>
>                 Key: KAFKA-353
>                 URL: https://issues.apache.org/jira/browse/KAFKA-353
>             Project: Kafka
>          Issue Type: Sub-task
>    Affects Versions: 0.8
>            Reporter: Jun Rao
>            Assignee: Joel Koshy
>         Attachments: kafka-353_v1.patch, kafka-353_v2.patch, kafka-353_v2_to_v3.patch, kafka-353_v3.patch
>
>


--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Updated] (KAFKA-353) tie producer-side ack with high watermark and progress of replicas

Posted by "Joel Koshy (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/KAFKA-353?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Joel Koshy updated KAFKA-353:
-----------------------------

    Attachment: kafka-353_v2_to_v3.patch

Here is an incremental patch over v2 that makes the diff clearer.
                
> tie producer-side ack with high watermark and progress of replicas
> ------------------------------------------------------------------
>
>                 Key: KAFKA-353
>                 URL: https://issues.apache.org/jira/browse/KAFKA-353
>             Project: Kafka
>          Issue Type: Sub-task
>    Affects Versions: 0.8
>            Reporter: Jun Rao
>            Assignee: Joel Koshy
>         Attachments: kafka-353_v1.patch, kafka-353_v2.patch, kafka-353_v2_to_v3.patch, kafka-353_v3.patch
>
>


--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Updated] (KAFKA-353) tie producer-side ack with high watermark and progress of replicas

Posted by "Joel Koshy (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/KAFKA-353?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Joel Koshy updated KAFKA-353:
-----------------------------

    Attachment: kafka-353_v2.patch

Here is v2 - changes from v1:

- Rebased.
- Addressed all the "minor" comments.
- Using NonFollowerId (alias of DefaultReplicaId) instead of -1
- For request timing: added the duration computation to the Response
  class and removed all the default -1.
- Added handling for requiredAcks < 0
- To deal with the race condition that Jay raised, did an explicit update
  after adding to the produceRequestPurgatory.
- Removed the proactive update to produceRequestPurgatory on a leader
  change. This facilitated moving the ProduceRequestPurgatory and
  DelayedProduce back to KafkaApis where they belong.
- I'm still doing the isLocal check in the isSatisfied method because we get
  it for free and NotLeaderForPartitionCode is more accurate than giving a
  timeout error code. We can discuss whether to remove this altogether or
  not.
- One thing v1 did not handle was negative timeouts which should be
  interpreted as forever, so I converted -ve values to Long.MaxValue.  In
  doing this I hit what I thought was a bug in DelayQueue but turned out to
  be an overflow issue with DelayedItem - added checks for this.

Also, Jun asked about a race condition on partitionStatus - although
isSatisfied cannot be called simultaneously on the same DelayedProduce, I
think there is a race condition between expire/checkSatisfied in the
requestPurgatory.  I can switch to a ConcurrentHashMap for the
partitionStatus map, but I think this is an issue in the RequestPurgatory.
I think it should be easy to fix, but wanted to call it out first to see if
I'm incorrect:
- An incoming request triggers checkSatisfied on a DelayedItem
- During checkSatisfied the DelayedItem expires and the expiration loop
  calls expire().
- There is a satisfied guard on the DelayedItem but that is checked only
  after expire() and after checkSatisfied() so it is possible for both
  checkSatisfied and expire to be called and it doesn't seem to make sense
  to allow both - it's one or the other.

                
> tie producer-side ack with high watermark and progress of replicas
> ------------------------------------------------------------------
>
>                 Key: KAFKA-353
>                 URL: https://issues.apache.org/jira/browse/KAFKA-353
>             Project: Kafka
>          Issue Type: Sub-task
>    Affects Versions: 0.8
>            Reporter: Jun Rao
>            Assignee: Joel Koshy
>         Attachments: kafka-353_v1.patch, kafka-353_v2.patch
>
>


--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (KAFKA-353) tie producer-side ack with high watermark and progress of replicas

Posted by "Jun Rao (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/KAFKA-353?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13403640#comment-13403640 ] 

Jun Rao commented on KAFKA-353:
-------------------------------

A few more comments on patch v3:

31. DelayedProduce.isSatisfied(): 
31.1 We iterate leader.partition.inSyncReplicas here. However, ISR can be updated concurrently. We probably need to make leader.partition.inSyncReplicas an AtomicReference.
31.2 Could you add a comment that explains what produce.requiredAcks < 0 means?

For the race condition between expire() and checkSatisfied, I agree that it's probably easier if RequestPurgatory does the synchronization on DelayedRequest.
                
> tie producer-side ack with high watermark and progress of replicas
> ------------------------------------------------------------------
>
>                 Key: KAFKA-353
>                 URL: https://issues.apache.org/jira/browse/KAFKA-353
>             Project: Kafka
>          Issue Type: Sub-task
>    Affects Versions: 0.8
>            Reporter: Jun Rao
>            Assignee: Joel Koshy
>         Attachments: kafka-353_v1.patch, kafka-353_v2.patch, kafka-353_v2_to_v3.patch, kafka-353_v3.patch
>
>


--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Resolved] (KAFKA-353) tie producer-side ack with high watermark and progress of replicas

Posted by "Joel Koshy (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/KAFKA-353?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Joel Koshy resolved KAFKA-353.
------------------------------

    Resolution: Fixed

Thanks for the review - committed to 0.8 and opened jiras for the other issues.
                
> tie producer-side ack with high watermark and progress of replicas
> ------------------------------------------------------------------
>
>                 Key: KAFKA-353
>                 URL: https://issues.apache.org/jira/browse/KAFKA-353
>             Project: Kafka
>          Issue Type: Sub-task
>    Affects Versions: 0.8
>            Reporter: Jun Rao
>            Assignee: Joel Koshy
>         Attachments: kafka-353_v1.patch, kafka-353_v2.patch, kafka-353_v2_to_v3.patch, kafka-353_v3.patch
>
>


--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (KAFKA-353) tie producer-side ack with high watermark and progress of replicas

Posted by "Jun Rao (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/KAFKA-353?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13400204#comment-13400204 ] 

Jun Rao commented on KAFKA-353:
-------------------------------

The suggestions for future improvements all make sense. We can create new jiras to track them.
                
> tie producer-side ack with high watermark and progress of replicas
> ------------------------------------------------------------------
>
>                 Key: KAFKA-353
>                 URL: https://issues.apache.org/jira/browse/KAFKA-353
>             Project: Kafka
>          Issue Type: Sub-task
>    Affects Versions: 0.8
>            Reporter: Jun Rao
>            Assignee: Joel Koshy
>         Attachments: kafka-353_v1.patch
>
>


--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (KAFKA-353) tie producer-side ack with high watermark and progress of replicas

Posted by "Jay Kreps (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/KAFKA-353?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13400595#comment-13400595 ] 

Jay Kreps commented on KAFKA-353:
---------------------------------

This doesn't seem to apply cleanly on 0.8, I get conflicts on ReplicaFetcherThread and ReplicaManager.

A few comments:
1. There is a race condition between adding to the purgatory and acknowledgement, I think. We first produce locally and then add the watcher, which allows a window where the message is available for replication with no watcher on ack. If a replica acknowledges a given produce before the DelayedProduce is added as a watcher that will not be recorded, I think, and the request will potentially timeout waiting for ISR acks. This would certainly happen and lead to sporadic timeouts. This is tricky, let's discuss. One solution would be to reverse the logic--add the watcher first then produce it locally, but this means having to handle the case where the local produce fails (we don't want to leak watchers).
2. As you say, you definitely can't pass KafkaApis to anything, making DelayedProduceReuqest an inner class of KafkaApis will fix this, I think.
3. I think the way we are tying the api handling and replica manager is not good. replica manager shouldn't know about DelayedProduce requests if possible, it should only know about replica management and be oblivous to the api level.
4. Why is the ack timeout in seconds? It should be milliseconds like everything else, right? One would commonly want a sub-second timeout.
5. As a stylistic thing it would be good to avoid direct use of Tuple2[A,B] and instead use (A,B) (e.g. myMethod: List[(A,B)] instead of myMethod: List[Tuple2[A,B]]
6. I don't understand why we are passing (topic, partition) to the purgatory as BOTH the key and request. This is a bit hacky since this is not the request. Let's discuss, we may need to clean up the purgatory api to make it gracefully handle this case.
7. I think the ProducerRequestPurgatory should be instantiated in KafkaApis not KafkaServer--it is an implementation detail of the api layer not a major subsystem.
8. The doubly nested map/flatMap in KafkaApis.handleProduceRequest is a little bit tricky could you clean that up or if that is not possible comment what it does (I think it just makes a list of (topic, partition) pairs, but the variable name doesn't explain it). Same with KafkaApis.handleFetchRequest.
9. DelayedProduce should definitely not be in the kafka.api package, that package is for request/response "structs". DelayedProduce is basically a bunch of server internals and not something the producer or consumer should be aware of. Likewise, the code for ProduceRequestPurgatory/FetchRequestPurgatory is kept differently, but these things are mirror images of one another. FetchRequestPurgatory is in KafkaApis, ProduceRequestPurgatory in its own file. These should match each other. The reasoning behind keeping things in KafkaApis was that those classes contained part of the logic for processing a request, so splitting it into two files makes it harder to read (the counter argument is that KafkaApis is getting pretty big). The other reason was so it could access the variables in KafkaApis as a nested class. Either way is probably fine, but they should be symetric between the two.
10. KafkaApis.handleProducerRequest and KafkaApis.produce are a little hard to differentiate. I think the later is effectively "produce to local log" and the former does the purgatory stuff. Would be good to call this out in the method name or comment.
11. I am not so sure about the logic of proactively responding to all outstanding requests with failure during a leadership change. Is this the right thing to do?
12. The logic in DelayedProduce.isSatisfied is very odd. I think what we want to do is respond to all outstanding requests in the event of a leadership change. But we do this by passing in the ReplicaManager and having logic that keys off whether or not the local broker is the leader. This seems quite convoluted. Wouldn't it make more sense to do this: (1) Have the replica manager allow "subscribers" to its state changes. (2) Add a subscriber that loops over all in-flight requests and expires them with an error when the local broker ceases to be the leader. There is a description of this pattern here http://codingjunkie.net/guava-eventbus/.
13. Nice catch with the request size. I think it is okay that we are using sizeInBytes, since it is not expected that the producer send fragments, though a fully correct implementation would check that.
                
> tie producer-side ack with high watermark and progress of replicas
> ------------------------------------------------------------------
>
>                 Key: KAFKA-353
>                 URL: https://issues.apache.org/jira/browse/KAFKA-353
>             Project: Kafka
>          Issue Type: Sub-task
>    Affects Versions: 0.8
>            Reporter: Jun Rao
>            Assignee: Joel Koshy
>         Attachments: kafka-353_v1.patch
>
>


--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Updated] (KAFKA-353) tie producer-side ack with high watermark and progress of replicas

Posted by "Joel Koshy (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/KAFKA-353?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Joel Koshy updated KAFKA-353:
-----------------------------

    Attachment: kafka-353_v1.patch

Patch is not too big, but a bit tricky. To help with review, here is an
overview:

- DelayedProduce:
  - Contains the logic to determine if it can be unblocked or not. The
    partitionStatus map is necessary to keep track of the local log's
    offset, error status, and whether acks are still pending.  The comments
    in the code should make the remaining logic clear.
  - Handling delayed producer requests brings in dependencies on the fetch
    request purgatory, replica manager and KafkaZookeeper - which is why I
    had to pass in kafkaApis to the DelayedProduce which is a bit ugly.
- KafkaApis:
  - The existing code for handling produce requests would respond when the
    leader persists the data to disk. We still do that if requiredAcks is 0
    or 1. For other values, we now create a DelayedProduce request which
    will be satisfied when requiredAcks followers are caught up.  If the
    |ISR| < requiredAcks then the request will time out.
  - handleFetchRequest: if a request comes from a follower replica, check
    the produceRequestPurgatory and see if DelayedProduce requests can be
    unblocked.
- ReplicaManager
  - only change is to try and unblock DelayedProduce requests to partitions
    for which the leader changed.

Note that even if a request times out, some of the partitions may have been
successfully acked - so even if one partition times out the global error
code is NoError. The receiver must check the errors array to determine if
there are any failures. I think this brings in the need for a
"PartialSuccess" global error code in the ProducerResponse. Thoughts on
this?

I think there was a bug in checking satisfied DelayedFetchRequests: the
checkSatisfied method would take a produceRequest's TopicData and see if the
total number of bytes in that produce request could satisfy the remaining
bytes to be filled in the DelayedFetchRequest. However, that could count
data for partitions that were not part of the DelayedFetchRequest.  This
patch fixes that issue as well - changed the FetchRequestPurgatory key from
TopicData to PartitionData and check on a per-partition basis.

Another potential issue is that the DelayedFetchRequest satisfied counts
using MessageSet's sizeInBytes, which could include incomplete messages - as
opposed to iterating over the valid messages and getting the size. I left
that as is. I think it is debatable which approach is correct in this case.

I added some trace logs at individual request level - e.g., "Produce request
to topic unblocked n delayedfetchrequests". These would be more useful if we
add a uuid to each produce request - I think this idea was tossed around on
the mailing list sometime before. Doesn't even have to be uuid or part of
the produceRequest's wire format - even an atomicLong counter (internal to
the broker) may be helpful- thoughts?

There is this corner case that I think is handled correctly but want to make
sure:
- leader receives a producer request and adds it to the
  ProduceRequestPurgatory.
- leadership changes while it is pending, so the error code for that
  partition is set to NotLeaderErrorForPartitionCode
- leadership changes back to this broker while the DelayedProduce is
  pending.
- In this scenario, the partition remains in the error state.
- I think it is correct because the leader would have become a follower
  (before it became a leader again), and would have truncated its log to the
  intermediate leader's HW.

If requiredAcks == |ISR| and the |ISR| shrinks while the DelayedProduce is
pending, the request may timeout. However, if the |ISR| expands back to its
original size while it is still pending it will get satisfied.

Let me know if you can think of other corner cases that need to be
considered - I wouldn't be surprised if there are quite a few.

I only did limited testing with the ProducerTest.

I think this opens up the following future work (separate jiras):
- Enhance system test to test all corner cases (leader changing while
  request pending, ISR shrinking while request pending, etc.
- ProducerResponse currently uses two separate arrays for errors and
  offsets; and ProducerRequest uses as array of TopicData each of which
  contains an array of PartitionData. It may be a good idea to improve these
  classes to use maps/something else as I had to resort to using find and
  indexOf to locate partition-level data in the original request.
- We should add some mbeans for request purgatory stats - avg. hold time,
  outstanding requests, etc.
- We should try and get rid of sleeps and fix all intermittent test
  failures.

If the above list sounds good I'll file the jiras.

                
> tie producer-side ack with high watermark and progress of replicas
> ------------------------------------------------------------------
>
>                 Key: KAFKA-353
>                 URL: https://issues.apache.org/jira/browse/KAFKA-353
>             Project: Kafka
>          Issue Type: Sub-task
>    Affects Versions: 0.8
>            Reporter: Jun Rao
>            Assignee: Joel Koshy
>         Attachments: kafka-353_v1.patch
>
>


--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (KAFKA-353) tie producer-side ack with high watermark and progress of replicas

Posted by "Jun Rao (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/KAFKA-353?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13400201#comment-13400201 ] 

Jun Rao commented on KAFKA-353:
-------------------------------

Thanks for patch v1. Looks good overall. Some comments:

1. TopicData.partitionDatas: data is the plural form of datum. So datas feels weird. How about partitionDataArray?

2. KafkaApis:
2.1 maybeUnblockDelayedRequests: put fetchRequestPurgatory.update in 1 line
2.2 handleFetchRequest: 
For the following line:
    if(fetchRequest.replicaId != -1) {
Instead of using -1 , could we create a constant like NoneFollowerReplicaId?
2.3 handleFetchRequest: When creating responses, we should fill in the elapsed time instead of passing in -1. Note that the elapsed time is in nano-secs. So, we probably should rename Response.elapsed to sth like elapsesNs. Ditto for handleProduceRequest.

3. DelayedProduce:
3.1 isSatisfied(): need to handle the case when requiredAcks is default (probably any value <0). This means whenever we get the ack from every replica in the current ISR, the request is satisfied. This can be done by simply making sure leader.HW >= delayedProduce.partitionStatus(followerFetchPartition).localOffset.
3.2 Could we change localOffsets to sth like requiredOffsets?
3.3 respond(): need to compute elapse time in Response 
3.4 We need to be aware that isSatisfied can be called concurrently on the same DelayedProduce object. I am not sure if it's really necessary, but maybe we should consider using AtomicBoolean for PartitionStatus.acksPending?

4. ProducerRequest: actTimeoutSecs should be actTimeoutMs.

5. ReplicaManager: I would vote for not checking ProducerRequestPurgatory when there is a leadership change, since the logic is simpler. We just let the producer requests timeout. If we do want to do the check here, we should share the ProducerRequestPurgatory object used in KafkaApis.

6. SyncProducerConfigShared: We should probably make the default requiredAcks to be -1 (i.e., wait for ack from each replica in ISR).

7. TopicMetadataTest: no need to change

8. 0.8 has moved. So need to rebase

                
> tie producer-side ack with high watermark and progress of replicas
> ------------------------------------------------------------------
>
>                 Key: KAFKA-353
>                 URL: https://issues.apache.org/jira/browse/KAFKA-353
>             Project: Kafka
>          Issue Type: Sub-task
>    Affects Versions: 0.8
>            Reporter: Jun Rao
>            Assignee: Joel Koshy
>         Attachments: kafka-353_v1.patch
>
>


--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (KAFKA-353) tie producer-side ack with high watermark and progress of replicas

Posted by "Jun Rao (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/KAFKA-353?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13404182#comment-13404182 ] 

Jun Rao commented on KAFKA-353:
-------------------------------

It's ok to handle the remaining issues in separate jiras. +1 from me on committing patch v3.
                
> tie producer-side ack with high watermark and progress of replicas
> ------------------------------------------------------------------
>
>                 Key: KAFKA-353
>                 URL: https://issues.apache.org/jira/browse/KAFKA-353
>             Project: Kafka
>          Issue Type: Sub-task
>    Affects Versions: 0.8
>            Reporter: Jun Rao
>            Assignee: Joel Koshy
>         Attachments: kafka-353_v1.patch, kafka-353_v2.patch, kafka-353_v2_to_v3.patch, kafka-353_v3.patch
>
>


--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (KAFKA-353) tie producer-side ack with high watermark and progress of replicas

Posted by "Jun Rao (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/KAFKA-353?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13400682#comment-13400682 ] 

Jun Rao commented on KAFKA-353:
-------------------------------

The race condition that Jay raised seems like a potential issue. However, it may be tricky to add the watcher before writing to the local log in the leader. This is because the watcher needs to know the end offset of messages in the leader's log. Another way to address the race condition is to still add messages to leader's log first, followed by adding the watcher. However, after the watcher is added, we can explicitly trigger a watcher check by calling ProducerRequestPurgatory.update.
                
> tie producer-side ack with high watermark and progress of replicas
> ------------------------------------------------------------------
>
>                 Key: KAFKA-353
>                 URL: https://issues.apache.org/jira/browse/KAFKA-353
>             Project: Kafka
>          Issue Type: Sub-task
>    Affects Versions: 0.8
>            Reporter: Jun Rao
>            Assignee: Joel Koshy
>         Attachments: kafka-353_v1.patch
>
>


--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira