You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Raul Castro Fernandez <ra...@contxt.in> on 2014/07/17 19:37:48 UTC

Review Request 23648: Patch for KAFKA-1524

-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/23648/
-----------------------------------------------------------

Review request for kafka.


Bugs: KAFKA-1524
    https://issues.apache.org/jira/browse/KAFKA-1524


Repository: kafka


Description
-------

KAFKA-1524; Implement transactional producer


Diffs
-----

  clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 522881c972ca42ff4dfb6237a2db15b625334d7e 
  clients/src/main/java/org/apache/kafka/clients/producer/InvalidTransactionStatusException.java PRE-CREATION 
  clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 00775abbcac850b0f2bb9a70b6fbc7cdf319bcf6 
  clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java c0f1d57e0feb894d9f246058cd0396461afe3225 
  clients/src/main/java/org/apache/kafka/clients/producer/Producer.java 36e8398416036cab84faad1f07159e5adefd8086 
  clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java f9de4af426449cceca12a8de9a9f54a6241d28d8 
  clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java 1ed3c28b436d28381d9402896e32d16f2586c65e 
  clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java dd0af8aee98abed5d4a0dc50989e37888bb353fe 
  clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionContext.java PRE-CREATION 
  clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionControl.java PRE-CREATION 
  clients/src/main/java/org/apache/kafka/common/errors/TransactionCoordinatorNotAvailableException.java PRE-CREATION 
  clients/src/main/java/org/apache/kafka/common/errors/TransactionFailedException.java PRE-CREATION 
  clients/src/main/java/org/apache/kafka/common/record/Compressor.java 0323f5f7032dceb49d820c17a41b78c56591ffc4 
  clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java 759f577eaf0e7d28a84926d4aa30f4ef0cb27bc2 
  clients/src/main/java/org/apache/kafka/common/record/Record.java 10df9fd8d3f4ec8c277650fa7eab269f3ea30d85 
  clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java 93b58d02eac0f8ca28440e3e0ebea28ed3a7673c 
  clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java 5489acac6806b3ae5e6d568d401d5a20c86cac05 
  clients/src/test/java/org/apache/kafka/clients/producer/TransactionContextTest.java PRE-CREATION 
  clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java 94a11121e207d5cf94dbc94443a8aa7edf387782 

Diff: https://reviews.apache.org/r/23648/diff/


Testing
-------


Thanks,

Raul Castro Fernandez


Re: Review Request 23648: Patch for KAFKA-1524

Posted by Raul Castro Fernandez <ra...@contxt.in>.

> On July 22, 2014, 7:59 p.m., Joel Koshy wrote:
> > clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionContext.java, line 92
> > <https://reviews.apache.org/r/23648/diff/1/?file=634391#file634391line92>
> >
> >     What if the coordinator itself fails after a while and needs to be re-queried? Not sure if you intend for all of these to be covered in the subsequent work that addresses failure scenarios more thoroughly.
> >

Yes, I will revisit this once we check all failure scenarios, as there are other changes we may need to make.


> On July 22, 2014, 7:59 p.m., Joel Koshy wrote:
> > clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionContext.java, line 146
> > <https://reviews.apache.org/r/23648/diff/1/?file=634391#file634391line146>
> >
> >     I think it would be clearer to use a negated condition. i.e., if (txStatus != TransactionStatus.NOTRANSACTION)

There are more than 2 states. With a negated condition we would not allow to start a transaction when the status is ABORTED, for example.


> On July 22, 2014, 7:59 p.m., Joel Koshy wrote:
> > clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionContext.java, line 158
> > <https://reviews.apache.org/r/23648/diff/1/?file=634391#file634391line158>
> >
> >     if (txStatus != TransactionStatus.ONGOING)

Similar to previous. In this case we would be allowing to abort a committed transaction.


> On July 22, 2014, 7:59 p.m., Joel Koshy wrote:
> > clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionContext.java, line 170
> > <https://reviews.apache.org/r/23648/diff/1/?file=634391#file634391line170>
> >
> >     if (txStatus != TransactionStatus.ONGOING)

In this case, repeating a commit command would throw an exception. I'm not sure if this is what we want, as there might be some failure scenarios where we want to be able to repeat commits. Not sure about this, maybe I should wait to have the failure handling done and then check this again?


> On July 22, 2014, 7:59 p.m., Joel Koshy wrote:
> > clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionContext.java, line 215
> > <https://reviews.apache.org/r/23648/diff/1/?file=634391#file634391line215>
> >
> >     Each check is a full iteration. Rather, can we just keep a count of pending messages for this check?

Yes, the iteration also removes messages. The idea is to remove them lazily when it is necessary to check if there are more to send. The alternative seems more complex, as we would need to attach a listener to check and remove received messages. Also, by keeping the remove logic in the same method that counts, we avoid the need to synch. between listener and this thread, i.e. it seems easier to return the exact count.


> On July 22, 2014, 7:59 p.m., Joel Koshy wrote:
> > clients/src/main/java/org/apache/kafka/clients/NetworkClient.java, line 186
> > <https://reviews.apache.org/r/23648/diff/1/?file=634383#file634383line186>
> >
> >     One issue with this call is that it can block and add to the total time that poll needs to wait which breaks the API contract of poll (i.e., it should return in at most timeout ms). Can we fold these into the poll below?
> >     
> >     Also, (especially since the method is prefixed with) maybe... it would be cleaner to move the txContext null check inside the method.

Regarding poll, do you mean putting the maybeUpdate... methods inside poll? This would require to pass at least two additional references. What if instead I measure the time to perform these maybeUpdate... and substrate from the remaining timeout for poll? I think this should be accurate enough


> On July 22, 2014, 7:59 p.m., Joel Koshy wrote:
> > clients/src/main/java/org/apache/kafka/clients/NetworkClient.java, line 463
> > <https://reviews.apache.org/r/23648/diff/1/?file=634383#file634383line463>
> >
> >     Offset commits are only needed to redo aborted transactions - i.e., typically in a failure scenario - so this is not really required for the scope of this jira right? Furthermore, we would want the partition of the offsets topic to be part of the transaction itself and the offset manager is not transaction aware either.

Correct. I started sketching this and should have not included in this patch. I will provide a complete version once we handle failure scenarios.


> On July 22, 2014, 7:59 p.m., Joel Koshy wrote:
> > clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionContext.java, line 218
> > <https://reviews.apache.org/r/23648/diff/1/?file=634391#file634391line218>
> >
> >     May need to unnecessarily wait. i.e., who interrupts?

True. My intention was to check if all ACKs are received, otherwise wait a fraction of the total timeout. Instead of wait(remainingWaitMs) it would be something like wait(remainingWaitMs/4). 


- Raul


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/23648/#review48338
-----------------------------------------------------------


On July 17, 2014, 5:37 p.m., Raul Castro Fernandez wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/23648/
> -----------------------------------------------------------
> 
> (Updated July 17, 2014, 5:37 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1524
>     https://issues.apache.org/jira/browse/KAFKA-1524
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> KAFKA-1524; Implement transactional producer
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 522881c972ca42ff4dfb6237a2db15b625334d7e 
>   clients/src/main/java/org/apache/kafka/clients/producer/InvalidTransactionStatusException.java PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 00775abbcac850b0f2bb9a70b6fbc7cdf319bcf6 
>   clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java c0f1d57e0feb894d9f246058cd0396461afe3225 
>   clients/src/main/java/org/apache/kafka/clients/producer/Producer.java 36e8398416036cab84faad1f07159e5adefd8086 
>   clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java f9de4af426449cceca12a8de9a9f54a6241d28d8 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java 1ed3c28b436d28381d9402896e32d16f2586c65e 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java dd0af8aee98abed5d4a0dc50989e37888bb353fe 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionContext.java PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionControl.java PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/errors/TransactionCoordinatorNotAvailableException.java PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/errors/TransactionFailedException.java PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/record/Compressor.java 0323f5f7032dceb49d820c17a41b78c56591ffc4 
>   clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java 759f577eaf0e7d28a84926d4aa30f4ef0cb27bc2 
>   clients/src/main/java/org/apache/kafka/common/record/Record.java 10df9fd8d3f4ec8c277650fa7eab269f3ea30d85 
>   clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java 93b58d02eac0f8ca28440e3e0ebea28ed3a7673c 
>   clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java 5489acac6806b3ae5e6d568d401d5a20c86cac05 
>   clients/src/test/java/org/apache/kafka/clients/producer/TransactionContextTest.java PRE-CREATION 
>   clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java 94a11121e207d5cf94dbc94443a8aa7edf387782 
> 
> Diff: https://reviews.apache.org/r/23648/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Raul Castro Fernandez
> 
>


Re: Review Request 23648: Patch for KAFKA-1524

Posted by Joel Koshy <jj...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/23648/#review48338
-----------------------------------------------------------



clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
<https://reviews.apache.org/r/23648/#comment84840>

    Instead of repeating the assignments here can you do: this(selector, metadata, ...) and then do the assignment of only txcontext?



clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
<https://reviews.apache.org/r/23648/#comment84857>

    One issue with this call is that it can block and add to the total time that poll needs to wait which breaks the API contract of poll (i.e., it should return in at most timeout ms). Can we fold these into the poll below?
    
    Also, (especially since the method is prefixed with) maybe... it would be cleaner to move the txContext null check inside the method.



clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
<https://reviews.apache.org/r/23648/#comment84879>

    No need for now - right?



clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
<https://reviews.apache.org/r/23648/#comment84880>

    and here?



clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
<https://reviews.apache.org/r/23648/#comment84881>

    Should have brought this up in the other RB: why is group id an int and 0? Group id is a string in an OCR.



clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
<https://reviews.apache.org/r/23648/#comment84882>

    Should we break out maybe update txcoordinator metadata to its own method (and reissue a call to leastLoadedNode)? i.e., the txcoordinatormetadata request need not be gated/coupled with topic metadata requests.



clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
<https://reviews.apache.org/r/23648/#comment84874>

    Offset commits are only needed to redo aborted transactions - i.e., typically in a failure scenario - so this is not really required for the scope of this jira right? Furthermore, we would want the partition of the offsets topic to be part of the transaction itself and the offset manager is not transaction aware either.



clients/src/main/java/org/apache/kafka/clients/producer/InvalidTransactionStatusException.java
<https://reviews.apache.org/r/23648/#comment84878>

    Comment needs to be fixed.



clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
<https://reviews.apache.org/r/23648/#comment84884>

    Can make this a bit more concise by calling the constructor with this.txContext.



clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
<https://reviews.apache.org/r/23648/#comment84887>

    tx.fetch is a slightly odd name. i.e., why is it a fetch?



clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
<https://reviews.apache.org/r/23648/#comment84888>

    flushRequested



clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionContext.java
<https://reviews.apache.org/r/23648/#comment84867>

    You mean transactional producers.



clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionContext.java
<https://reviews.apache.org/r/23648/#comment84868>

    Prefer PENDING or OPEN to ONGOING.



clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionContext.java
<https://reviews.apache.org/r/23648/#comment84869>

    The name is a bit confusing since we also have txcoordinatormetadata - I'm wondering if it helps significantly to have what seems to be a specialized context nested inside a context.



clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionContext.java
<https://reviews.apache.org/r/23648/#comment84870>

    updateTxCoordinatorMetadata



clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionContext.java
<https://reviews.apache.org/r/23648/#comment84871>

    Can you make the comment a bit clearer - i.e., what offsets specifically does this refer to? I think you mean start offsets/state of the input partitions of this transaction.



clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionContext.java
<https://reviews.apache.org/r/23648/#comment84866>

    Let us call it txGroupId instead.



clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionContext.java
<https://reviews.apache.org/r/23648/#comment84872>

    What if the coordinator itself fails after a while and needs to be re-queried? Not sure if you intend for all of these to be covered in the subsequent work that addresses failure scenarios more thoroughly.
    



clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionContext.java
<https://reviews.apache.org/r/23648/#comment84873>

    Same here.



clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionContext.java
<https://reviews.apache.org/r/23648/#comment84889>

    For this sort of API it may be better/clear enough to just have a getter and have the caller do a null check.



clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionContext.java
<https://reviews.apache.org/r/23648/#comment84890>

    I think it would be clearer to use a negated condition. i.e., if (txStatus != TransactionStatus.NOTRANSACTION)



clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionContext.java
<https://reviews.apache.org/r/23648/#comment84891>

    if (txStatus != TransactionStatus.ONGOING)



clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionContext.java
<https://reviews.apache.org/r/23648/#comment84892>

    if (txStatus != TransactionStatus.ONGOING)



clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionContext.java
<https://reviews.apache.org/r/23648/#comment84895>

    Each check is a full iteration. Rather, can we just keep a count of pending messages for this check?



clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionContext.java
<https://reviews.apache.org/r/23648/#comment84894>

    May need to unnecessarily wait. i.e., who interrupts?



clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionContext.java
<https://reviews.apache.org/r/23648/#comment84893>

    Unnecessary catch?



clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionContext.java
<https://reviews.apache.org/r/23648/#comment84886>

    Is there a significant benefit in nesting this here?



clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionControl.java
<https://reviews.apache.org/r/23648/#comment84897>

    Maybe we should set prepare_commit/abort to 3 and 4.



clients/src/main/java/org/apache/kafka/common/errors/TransactionCoordinatorNotAvailableException.java
<https://reviews.apache.org/r/23648/#comment84898>

    Incorrect comment.



clients/src/main/java/org/apache/kafka/common/errors/TransactionFailedException.java
<https://reviews.apache.org/r/23648/#comment84899>

    Same


- Joel Koshy


On July 17, 2014, 5:37 p.m., Raul Castro Fernandez wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/23648/
> -----------------------------------------------------------
> 
> (Updated July 17, 2014, 5:37 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1524
>     https://issues.apache.org/jira/browse/KAFKA-1524
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> KAFKA-1524; Implement transactional producer
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 522881c972ca42ff4dfb6237a2db15b625334d7e 
>   clients/src/main/java/org/apache/kafka/clients/producer/InvalidTransactionStatusException.java PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 00775abbcac850b0f2bb9a70b6fbc7cdf319bcf6 
>   clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java c0f1d57e0feb894d9f246058cd0396461afe3225 
>   clients/src/main/java/org/apache/kafka/clients/producer/Producer.java 36e8398416036cab84faad1f07159e5adefd8086 
>   clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java f9de4af426449cceca12a8de9a9f54a6241d28d8 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java 1ed3c28b436d28381d9402896e32d16f2586c65e 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java dd0af8aee98abed5d4a0dc50989e37888bb353fe 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionContext.java PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionControl.java PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/errors/TransactionCoordinatorNotAvailableException.java PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/errors/TransactionFailedException.java PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/record/Compressor.java 0323f5f7032dceb49d820c17a41b78c56591ffc4 
>   clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java 759f577eaf0e7d28a84926d4aa30f4ef0cb27bc2 
>   clients/src/main/java/org/apache/kafka/common/record/Record.java 10df9fd8d3f4ec8c277650fa7eab269f3ea30d85 
>   clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java 93b58d02eac0f8ca28440e3e0ebea28ed3a7673c 
>   clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java 5489acac6806b3ae5e6d568d401d5a20c86cac05 
>   clients/src/test/java/org/apache/kafka/clients/producer/TransactionContextTest.java PRE-CREATION 
>   clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java 94a11121e207d5cf94dbc94443a8aa7edf387782 
> 
> Diff: https://reviews.apache.org/r/23648/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Raul Castro Fernandez
> 
>