You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Jiangjie Qin <be...@gmail.com> on 2015/03/09 05:04:49 UTC

Review Request 31850: Patch for KAFKA-1660

-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31850/
-----------------------------------------------------------

Review request for kafka.


Bugs: KAFKA-1660
    https://issues.apache.org/jira/browse/KAFKA-1660


Repository: kafka


Description
-------

Patch for KAFKA-1660 add a close method with timeout to producer.


Diffs
-----

  clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 7397e565fd865214529ffccadd4222d835ac8110 
  clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 6913090af03a455452b0b5c3df78f266126b3854 
  clients/src/main/java/org/apache/kafka/clients/producer/Producer.java 5b3e75ed83a940bc922f9eca10d4008d67aa37c9 
  clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java d5c79e2481d5e9a2524ac2ef6a6879f61cb7cb5f 
  clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java ed9c63a6679e3aaf83d19fde19268553a4c107c2 
  clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java c2fdc23239bd2196cd912c3d121b591f21393eab 
  clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java c1bc40648479d4c2ae4ac52f40dadc070a6bcf6f 
  core/src/test/scala/integration/kafka/api/ProducerSendTest.scala 3df450784592b894008e7507b2737f9bb07f7bd2 

Diff: https://reviews.apache.org/r/31850/diff/


Testing
-------


Thanks,

Jiangjie Qin


Re: Review Request 31850: Patch for KAFKA-1660

Posted by Guozhang Wang <wa...@gmail.com>.

> On April 7, 2015, 1:28 a.m., Joel Koshy wrote:
> > clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java, line 362
> > <https://reviews.apache.org/r/31850/diff/5/?file=908540#file908540line362>
> >
> >     As you explained offline, the sender does not have access to record batches while requests are in flight, but it would be super if we can figure out a way to avoid leaking details of batch completion (which is currently exclusively in sender) into the RecordAccumulator.

Actually, since the incomplete batches list was introduced when we add the flush() call, we are sort of leaking it to accumulator already before this patch. And I feel it is not that bad to add this list into the accumulator.


- Guozhang


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31850/#review79063
-----------------------------------------------------------


On March 27, 2015, 11:35 p.m., Jiangjie Qin wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/31850/
> -----------------------------------------------------------
> 
> (Updated March 27, 2015, 11:35 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1660
>     https://issues.apache.org/jira/browse/KAFKA-1660
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> A minor fix.
> 
> 
> Incorporated Guozhang's comments.
> 
> 
> Modify according to the latest conclusion.
> 
> 
> Patch for the finally passed KIP-15git status
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java ab263423ff1d33170effb71acdef3fc501fa072a 
>   clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 6913090af03a455452b0b5c3df78f266126b3854 
>   clients/src/main/java/org/apache/kafka/clients/producer/Producer.java 5b3e75ed83a940bc922f9eca10d4008d67aa37c9 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java 88b4e4fbf3bf6fb6d2f90551a792b95d4cd51c40 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 70954cadb5c7e9a4c326afcf9d9a07db230e7db2 
>   clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java fee322fa0dd9704374db4a6964246a7d2918d3e4 
>   clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java c2fdc23239bd2196cd912c3d121b591f21393eab 
>   clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java e379ac89c9a2fbfe750d6b0dec693b7eabb76204 
>   core/src/test/scala/integration/kafka/api/ProducerSendTest.scala 3df450784592b894008e7507b2737f9bb07f7bd2 
> 
> Diff: https://reviews.apache.org/r/31850/diff/
> 
> 
> Testing
> -------
> 
> Unit tests passed.
> 
> 
> Thanks,
> 
> Jiangjie Qin
> 
>


Re: Review Request 31850: Patch for KAFKA-1660

Posted by Jiangjie Qin <be...@gmail.com>.

> On April 7, 2015, 1:28 a.m., Joel Koshy wrote:
> > clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java, line 362
> > <https://reviews.apache.org/r/31850/diff/5/?file=908540#file908540line362>
> >
> >     As you explained offline, the sender does not have access to record batches while requests are in flight, but it would be super if we can figure out a way to avoid leaking details of batch completion (which is currently exclusively in sender) into the RecordAccumulator.
> 
> Guozhang Wang wrote:
>     Actually, since the incomplete batches list was introduced when we add the flush() call, we are sort of leaking it to accumulator already before this patch. And I feel it is not that bad to add this list into the accumulator.

Yes, the incomplete batch set was added for flush(). We may be able to just get the incomplete batches from recorad accumulator and fail them all in the sender. In that case all the batch.done will only be called in sender. But we need to expose accumulator to sender in this case.
I actually just found another synchronization problem between Accumulator.close and Accumulator.append. It is possible for a user thread append a message after accumulator is closed. We might miss callback for that last message in that case.


> On April 7, 2015, 1:28 a.m., Joel Koshy wrote:
> > clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java, line 365
> > <https://reviews.apache.org/r/31850/diff/5/?file=908540#file908540line365>
> >
> >     I don't think we should overload InterruptException for this. InterruptException is a wrapper around InterruptedException. i.e., after an InterruptException the thread should in fact have been interrupted - i.e., the interrupt status of the thread should be true (which is not the case here).

That makes sense. My intention was trying to say the send was interrupted. Maybe it's better to use IllegalStateException.


> On April 7, 2015, 1:28 a.m., Joel Koshy wrote:
> > core/src/test/scala/integration/kafka/api/ProducerSendTest.scala, line 333
> > <https://reviews.apache.org/r/31850/diff/5/?file=908545#file908545line333>
> >
> >     Can you also add a test for calling close with a non-zero timeout in the callback?

It is actually tested in line 394. But the comment was not right. I'll modify that.


- Jiangjie


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31850/#review79063
-----------------------------------------------------------


On March 27, 2015, 11:35 p.m., Jiangjie Qin wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/31850/
> -----------------------------------------------------------
> 
> (Updated March 27, 2015, 11:35 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1660
>     https://issues.apache.org/jira/browse/KAFKA-1660
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> A minor fix.
> 
> 
> Incorporated Guozhang's comments.
> 
> 
> Modify according to the latest conclusion.
> 
> 
> Patch for the finally passed KIP-15git status
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java ab263423ff1d33170effb71acdef3fc501fa072a 
>   clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 6913090af03a455452b0b5c3df78f266126b3854 
>   clients/src/main/java/org/apache/kafka/clients/producer/Producer.java 5b3e75ed83a940bc922f9eca10d4008d67aa37c9 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java 88b4e4fbf3bf6fb6d2f90551a792b95d4cd51c40 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 70954cadb5c7e9a4c326afcf9d9a07db230e7db2 
>   clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java fee322fa0dd9704374db4a6964246a7d2918d3e4 
>   clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java c2fdc23239bd2196cd912c3d121b591f21393eab 
>   clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java e379ac89c9a2fbfe750d6b0dec693b7eabb76204 
>   core/src/test/scala/integration/kafka/api/ProducerSendTest.scala 3df450784592b894008e7507b2737f9bb07f7bd2 
> 
> Diff: https://reviews.apache.org/r/31850/diff/
> 
> 
> Testing
> -------
> 
> Unit tests passed.
> 
> 
> Thanks,
> 
> Jiangjie Qin
> 
>


Re: Review Request 31850: Patch for KAFKA-1660

Posted by Joel Koshy <jj...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31850/#review79063
-----------------------------------------------------------


Thanks for the patch.


clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
<https://reviews.apache.org/r/31850/#comment128121>

    [NOTE] is not a standard javadoc highlight is it? (I don't know.) If not, can you just use the standard <strong> for emphasis?



clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
<https://reviews.apache.org/r/31850/#comment128123>

    We are doing this because -> "We do this because" or "This is done because"



clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
<https://reviews.apache.org/r/31850/#comment128125>

    previous -> outstanding



clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
<https://reviews.apache.org/r/31850/#comment128126>

    was not able -> is unable
    before timeout -> before the specified timeout



clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
<https://reviews.apache.org/r/31850/#comment128134>

    If timeout > 0, this method blocks as it tries to join the sender thread within the specified timeout.



clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
<https://reviews.apache.org/r/31850/#comment128135>

    ==
    
    Also, is this completely true? It seems we may join (albeit without trying to send anything further) if called from a non-sender thread.



clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
<https://reviews.apache.org/r/31850/#comment128138>

    We do this because the sender thread would otherwise try to join itself and block forever.



clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
<https://reviews.apache.org/r/31850/#comment128142>

    "When" -> "If an"
    
    That said, this doc is a bit weird - if that is what the user is supposed to do then why can't this method take care of it (i.e., let the interrupted exception go)? It seems the right thing to do would be to just propagate and let the caller decide what to do.



clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
<https://reviews.apache.org/r/31850/#comment128144>

    "with timeout = {} ms"



clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
<https://reviews.apache.org/r/31850/#comment128141>

    elegantly -> gracefully



clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
<https://reviews.apache.org/r/31850/#comment128147>

    ```log.warn("Overriding close timeout {} ms to 0 ms in order to prevent deadlock due to self-join. This means you have incorrectly invoked close with a non-zero timeout from the producer call-back.", timeout)```



clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
<https://reviews.apache.org/r/31850/#comment128154>

    May want to log that we are proceeding to a force close



clients/src/main/java/org/apache/kafka/clients/producer/Producer.java
<https://reviews.apache.org/r/31850/#comment128155>

    until timeout is expired -> within the specified timeout. If the close does not complete within the timeout, discard any pending messages and force close the producer.



clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
<https://reviews.apache.org/r/31850/#comment128166>

    As you explained offline, the sender does not have access to record batches while requests are in flight, but it would be super if we can figure out a way to avoid leaking details of batch completion (which is currently exclusively in sender) into the RecordAccumulator.



clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
<https://reviews.apache.org/r/31850/#comment128200>

    I don't think we should overload InterruptException for this. InterruptException is a wrapper around InterruptedException. i.e., after an InterruptException the thread should in fact have been interrupted - i.e., the interrupt status of the thread should be true (which is not the case here).



clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
<https://reviews.apache.org/r/31850/#comment128167>

    See comment above.



clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
<https://reviews.apache.org/r/31850/#comment128169>

    Can you revert this? i.e., I think the previous version with locally declared accums is cleaner.



core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
<https://reviews.apache.org/r/31850/#comment128171>

    Can you also add a test for calling close with a non-zero timeout in the callback?


- Joel Koshy


On March 27, 2015, 11:35 p.m., Jiangjie Qin wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/31850/
> -----------------------------------------------------------
> 
> (Updated March 27, 2015, 11:35 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1660
>     https://issues.apache.org/jira/browse/KAFKA-1660
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> A minor fix.
> 
> 
> Incorporated Guozhang's comments.
> 
> 
> Modify according to the latest conclusion.
> 
> 
> Patch for the finally passed KIP-15git status
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java ab263423ff1d33170effb71acdef3fc501fa072a 
>   clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 6913090af03a455452b0b5c3df78f266126b3854 
>   clients/src/main/java/org/apache/kafka/clients/producer/Producer.java 5b3e75ed83a940bc922f9eca10d4008d67aa37c9 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java 88b4e4fbf3bf6fb6d2f90551a792b95d4cd51c40 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 70954cadb5c7e9a4c326afcf9d9a07db230e7db2 
>   clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java fee322fa0dd9704374db4a6964246a7d2918d3e4 
>   clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java c2fdc23239bd2196cd912c3d121b591f21393eab 
>   clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java e379ac89c9a2fbfe750d6b0dec693b7eabb76204 
>   core/src/test/scala/integration/kafka/api/ProducerSendTest.scala 3df450784592b894008e7507b2737f9bb07f7bd2 
> 
> Diff: https://reviews.apache.org/r/31850/diff/
> 
> 
> Testing
> -------
> 
> Unit tests passed.
> 
> 
> Thanks,
> 
> Jiangjie Qin
> 
>


Re: Review Request 31850: Patch for KAFKA-1660

Posted by Guozhang Wang <wa...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31850/#review79107
-----------------------------------------------------------



clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
<https://reviews.apache.org/r/31850/#comment128201>

    send -> previously sent


- Guozhang Wang


On March 27, 2015, 11:35 p.m., Jiangjie Qin wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/31850/
> -----------------------------------------------------------
> 
> (Updated March 27, 2015, 11:35 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1660
>     https://issues.apache.org/jira/browse/KAFKA-1660
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> A minor fix.
> 
> 
> Incorporated Guozhang's comments.
> 
> 
> Modify according to the latest conclusion.
> 
> 
> Patch for the finally passed KIP-15git status
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java ab263423ff1d33170effb71acdef3fc501fa072a 
>   clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 6913090af03a455452b0b5c3df78f266126b3854 
>   clients/src/main/java/org/apache/kafka/clients/producer/Producer.java 5b3e75ed83a940bc922f9eca10d4008d67aa37c9 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java 88b4e4fbf3bf6fb6d2f90551a792b95d4cd51c40 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 70954cadb5c7e9a4c326afcf9d9a07db230e7db2 
>   clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java fee322fa0dd9704374db4a6964246a7d2918d3e4 
>   clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java c2fdc23239bd2196cd912c3d121b591f21393eab 
>   clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java e379ac89c9a2fbfe750d6b0dec693b7eabb76204 
>   core/src/test/scala/integration/kafka/api/ProducerSendTest.scala 3df450784592b894008e7507b2737f9bb07f7bd2 
> 
> Diff: https://reviews.apache.org/r/31850/diff/
> 
> 
> Testing
> -------
> 
> Unit tests passed.
> 
> 
> Thanks,
> 
> Jiangjie Qin
> 
>


Re: Review Request 31850: Patch for KAFKA-1660

Posted by Jiangjie Qin <be...@gmail.com>.

> On April 10, 2015, 4:36 p.m., Joel Koshy wrote:
> > clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java, line 394
> > <https://reviews.apache.org/r/31850/diff/7/?file=921104#file921104line394>
> >
> >     I was trying to find a case where it wouldn't work, but I think it works as required.
> >     
> >     - Client thread 1 calls close
> >     - Client thread 2 calls append _before_ the accumulator is closed and reaches at or after line 177
> >     - Client thread 1 marks the accumulator as closed
> >     - Sender thread comes to this point and aborts/clears batches.
> >     - Client thread 2 allocates and returns a new batch (and decrements the appendsInProgress count)
> >     - Sender thread checks appendInProgress which returns false
> >     - Which is why we need the additional abortBatches after the loop.
> >     
> >     It is tricky though. I'm wondering if the following would work and is simpler/clearer: make the post-condition of close be (i) the accumulator closed flag is true && (ii) there are no pending appends.
> >     
> >     IOW in accumulator.close, set the flag to true and then wait until there are no appendsInProgress. Do you think that would work?

Talked with Joel offline, blocking on close has issues if close(0) is called from callback. I added a comment to explain the tricky synchronization.


- Jiangjie


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31850/#review79677
-----------------------------------------------------------


On April 10, 2015, 10:09 p.m., Jiangjie Qin wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/31850/
> -----------------------------------------------------------
> 
> (Updated April 10, 2015, 10:09 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1660
>     https://issues.apache.org/jira/browse/KAFKA-1660
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> A minor fix.
> 
> 
> Incorporated Guozhang's comments.
> 
> 
> Modify according to the latest conclusion.
> 
> 
> Patch for the finally passed KIP-15git status
> 
> 
> Addressed Joel and Guozhang's comments.
> 
> 
> rebased on trunk
> 
> 
> Rebase on trunk
> 
> 
> Addressed Joel's comments.
> 
> 
> Addressed Joel's comments
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b91e2c52ed0acb1faa85915097d97bafa28c413a 
>   clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 6913090af03a455452b0b5c3df78f266126b3854 
>   clients/src/main/java/org/apache/kafka/clients/producer/Producer.java 5b3e75ed83a940bc922f9eca10d4008d67aa37c9 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java 0e7ab29a07301309aa8ca5b75b32b8e05ddc3a94 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 70954cadb5c7e9a4c326afcf9d9a07db230e7db2 
>   clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java fee322fa0dd9704374db4a6964246a7d2918d3e4 
>   clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java c2fdc23239bd2196cd912c3d121b591f21393eab 
>   clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java 05e2929c0a9fc8cf2a8ebe0776ca62d5f3962d5c 
>   core/src/test/scala/integration/kafka/api/ProducerSendTest.scala 9811a2b2b1e9bf1beb301138f7626e12d275a8db 
> 
> Diff: https://reviews.apache.org/r/31850/diff/
> 
> 
> Testing
> -------
> 
> Unit tests passed.
> 
> 
> Thanks,
> 
> Jiangjie Qin
> 
>


Re: Review Request 31850: Patch for KAFKA-1660

Posted by Joel Koshy <jj...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31850/#review79677
-----------------------------------------------------------



clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
<https://reviews.apache.org/r/31850/#comment129197>

    appendsInProgress



clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
<https://reviews.apache.org/r/31850/#comment129198>

    appending



clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
<https://reviews.apache.org/r/31850/#comment129199>

    I was trying to find a case where it wouldn't work, but I think it works as required.
    
    - Client thread 1 calls close
    - Client thread 2 calls append _before_ the accumulator is closed and reaches at or after line 177
    - Client thread 1 marks the accumulator as closed
    - Sender thread comes to this point and aborts/clears batches.
    - Client thread 2 allocates and returns a new batch (and decrements the appendsInProgress count)
    - Sender thread checks appendInProgress which returns false
    - Which is why we need the additional abortBatches after the loop.
    
    It is tricky though. I'm wondering if the following would work and is simpler/clearer: make the post-condition of close be (i) the accumulator closed flag is true && (ii) there are no pending appends.
    
    IOW in accumulator.close, set the flag to true and then wait until there are no appendsInProgress. Do you think that would work?


- Joel Koshy


On April 8, 2015, 9:01 p.m., Jiangjie Qin wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/31850/
> -----------------------------------------------------------
> 
> (Updated April 8, 2015, 9:01 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1660
>     https://issues.apache.org/jira/browse/KAFKA-1660
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> A minor fix.
> 
> 
> Incorporated Guozhang's comments.
> 
> 
> Modify according to the latest conclusion.
> 
> 
> Patch for the finally passed KIP-15git status
> 
> 
> Addressed Joel and Guozhang's comments.
> 
> 
> rebased on trunk
> 
> 
> Rebase on trunk
> 
> 
> Addressed Joel's comments.
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b91e2c52ed0acb1faa85915097d97bafa28c413a 
>   clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 6913090af03a455452b0b5c3df78f266126b3854 
>   clients/src/main/java/org/apache/kafka/clients/producer/Producer.java 5b3e75ed83a940bc922f9eca10d4008d67aa37c9 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java 0e7ab29a07301309aa8ca5b75b32b8e05ddc3a94 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 70954cadb5c7e9a4c326afcf9d9a07db230e7db2 
>   clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java fee322fa0dd9704374db4a6964246a7d2918d3e4 
>   clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java c2fdc23239bd2196cd912c3d121b591f21393eab 
>   clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java 05e2929c0a9fc8cf2a8ebe0776ca62d5f3962d5c 
>   core/src/test/scala/integration/kafka/api/ProducerSendTest.scala 9811a2b2b1e9bf1beb301138f7626e12d275a8db 
> 
> Diff: https://reviews.apache.org/r/31850/diff/
> 
> 
> Testing
> -------
> 
> Unit tests passed.
> 
> 
> Thanks,
> 
> Jiangjie Qin
> 
>


Re: Review Request 31850: Patch for KAFKA-1660

Posted by Joel Koshy <jj...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31850/#review83433
-----------------------------------------------------------

Ship it!


Minor comments - I will address these on check-in.


clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
<https://reviews.apache.org/r/31850/#comment134421>

    before marking it as done.



core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
<https://reviews.apache.org/r/31850/#comment134418>

    This tests idempotence of the close call.



core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
<https://reviews.apache.org/r/31850/#comment134417>

    `100 -> %d, (i + 1) * numRecords`


- Joel Koshy


On April 30, 2015, 12:37 a.m., Jiangjie Qin wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/31850/
> -----------------------------------------------------------
> 
> (Updated April 30, 2015, 12:37 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1660
>     https://issues.apache.org/jira/browse/KAFKA-1660
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> A minor fix.
> 
> 
> Incorporated Guozhang's comments.
> 
> 
> Modify according to the latest conclusion.
> 
> 
> Patch for the finally passed KIP-15git status
> 
> 
> Addressed Joel and Guozhang's comments.
> 
> 
> rebased on trunk
> 
> 
> Rebase on trunk
> 
> 
> Addressed Joel's comments.
> 
> 
> Addressed Joel's comments
> 
> 
> Addressed Jay's comments
> 
> 
> Change java doc as Jay suggested.
> 
> 
> Go back to the AtomicInteger approach for less dependency.
> 
> 
> Rebased on trunk
> 
> 
> Add some missing code from rebase.
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 42b12928781463b56fc4a45d96bb4da2745b6d95 
>   clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 6913090af03a455452b0b5c3df78f266126b3854 
>   clients/src/main/java/org/apache/kafka/clients/producer/Producer.java 5b3e75ed83a940bc922f9eca10d4008d67aa37c9 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java 49a98838767615dd952da20825f6985698137710 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b2db91ca14bbd17fef5ce85839679144fff3f689 
>   clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java fee322fa0dd9704374db4a6964246a7d2918d3e4 
>   clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java c44054038066f0d0829d05f082b2ee42b34cded7 
>   clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java baa48e7c1b7ac5da8f3aca29f653c3fff88f8009 
>   core/src/test/scala/integration/kafka/api/ProducerSendTest.scala 9811a2b2b1e9bf1beb301138f7626e12d275a8db 
> 
> Diff: https://reviews.apache.org/r/31850/diff/
> 
> 
> Testing
> -------
> 
> Unit tests passed.
> 
> 
> Thanks,
> 
> Jiangjie Qin
> 
>


Re: Review Request 31850: Patch for KAFKA-1660

Posted by Jiangjie Qin <be...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31850/
-----------------------------------------------------------

(Updated May 12, 2015, 9:29 p.m.)


Review request for kafka.


Bugs: KAFKA-1660
    https://issues.apache.org/jira/browse/KAFKA-1660


Repository: kafka


Description (updated)
-------

A minor fix.


Incorporated Guozhang's comments.


Modify according to the latest conclusion.


Patch for the finally passed KIP-15git status


Addressed Joel and Guozhang's comments.


rebased on trunk


Rebase on trunk


Addressed Joel's comments.


Addressed Joel's comments


Addressed Jay's comments


Change java doc as Jay suggested.


Go back to the AtomicInteger approach for less dependency.


Rebased on trunk


Add some missing code from rebase.


Rebased on trunk and incorporated Joel's comments


Rebased on trunk and incorporated Joel's comments


Rebased on trunk and incorporated Joel's comments


Diffs (updated)
-----

  clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 42b12928781463b56fc4a45d96bb4da2745b6d95 
  clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 6913090af03a455452b0b5c3df78f266126b3854 
  clients/src/main/java/org/apache/kafka/clients/producer/Producer.java 5b3e75ed83a940bc922f9eca10d4008d67aa37c9 
  clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java 49a98838767615dd952da20825f6985698137710 
  clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b2db91ca14bbd17fef5ce85839679144fff3f689 
  clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java fee322fa0dd9704374db4a6964246a7d2918d3e4 
  clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java 16a67a2a5d2a62dd933c53749221e19c5019524b 
  clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java baa48e7c1b7ac5da8f3aca29f653c3fff88f8009 
  core/src/test/scala/integration/kafka/api/ProducerSendTest.scala 9811a2b2b1e9bf1beb301138f7626e12d275a8db 

Diff: https://reviews.apache.org/r/31850/diff/


Testing
-------

Unit tests passed.


Thanks,

Jiangjie Qin


Re: Review Request 31850: Patch for KAFKA-1660

Posted by Jay Kreps <bo...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31850/#review82402
-----------------------------------------------------------

Ship it!


Ship It!

- Jay Kreps


On April 30, 2015, 12:37 a.m., Jiangjie Qin wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/31850/
> -----------------------------------------------------------
> 
> (Updated April 30, 2015, 12:37 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1660
>     https://issues.apache.org/jira/browse/KAFKA-1660
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> A minor fix.
> 
> 
> Incorporated Guozhang's comments.
> 
> 
> Modify according to the latest conclusion.
> 
> 
> Patch for the finally passed KIP-15git status
> 
> 
> Addressed Joel and Guozhang's comments.
> 
> 
> rebased on trunk
> 
> 
> Rebase on trunk
> 
> 
> Addressed Joel's comments.
> 
> 
> Addressed Joel's comments
> 
> 
> Addressed Jay's comments
> 
> 
> Change java doc as Jay suggested.
> 
> 
> Go back to the AtomicInteger approach for less dependency.
> 
> 
> Rebased on trunk
> 
> 
> Add some missing code from rebase.
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 42b12928781463b56fc4a45d96bb4da2745b6d95 
>   clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 6913090af03a455452b0b5c3df78f266126b3854 
>   clients/src/main/java/org/apache/kafka/clients/producer/Producer.java 5b3e75ed83a940bc922f9eca10d4008d67aa37c9 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java 49a98838767615dd952da20825f6985698137710 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b2db91ca14bbd17fef5ce85839679144fff3f689 
>   clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java fee322fa0dd9704374db4a6964246a7d2918d3e4 
>   clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java c44054038066f0d0829d05f082b2ee42b34cded7 
>   clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java baa48e7c1b7ac5da8f3aca29f653c3fff88f8009 
>   core/src/test/scala/integration/kafka/api/ProducerSendTest.scala 9811a2b2b1e9bf1beb301138f7626e12d275a8db 
> 
> Diff: https://reviews.apache.org/r/31850/diff/
> 
> 
> Testing
> -------
> 
> Unit tests passed.
> 
> 
> Thanks,
> 
> Jiangjie Qin
> 
>


Re: Review Request 31850: Patch for KAFKA-1660

Posted by Jiangjie Qin <be...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31850/
-----------------------------------------------------------

(Updated April 30, 2015, 12:37 a.m.)


Review request for kafka.


Bugs: KAFKA-1660
    https://issues.apache.org/jira/browse/KAFKA-1660


Repository: kafka


Description (updated)
-------

A minor fix.


Incorporated Guozhang's comments.


Modify according to the latest conclusion.


Patch for the finally passed KIP-15git status


Addressed Joel and Guozhang's comments.


rebased on trunk


Rebase on trunk


Addressed Joel's comments.


Addressed Joel's comments


Addressed Jay's comments


Change java doc as Jay suggested.


Go back to the AtomicInteger approach for less dependency.


Rebased on trunk


Add some missing code from rebase.


Diffs (updated)
-----

  clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 42b12928781463b56fc4a45d96bb4da2745b6d95 
  clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 6913090af03a455452b0b5c3df78f266126b3854 
  clients/src/main/java/org/apache/kafka/clients/producer/Producer.java 5b3e75ed83a940bc922f9eca10d4008d67aa37c9 
  clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java 49a98838767615dd952da20825f6985698137710 
  clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b2db91ca14bbd17fef5ce85839679144fff3f689 
  clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java fee322fa0dd9704374db4a6964246a7d2918d3e4 
  clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java c44054038066f0d0829d05f082b2ee42b34cded7 
  clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java baa48e7c1b7ac5da8f3aca29f653c3fff88f8009 
  core/src/test/scala/integration/kafka/api/ProducerSendTest.scala 9811a2b2b1e9bf1beb301138f7626e12d275a8db 

Diff: https://reviews.apache.org/r/31850/diff/


Testing
-------

Unit tests passed.


Thanks,

Jiangjie Qin


Re: Review Request 31850: Patch for KAFKA-1660

Posted by Jiangjie Qin <be...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31850/
-----------------------------------------------------------

(Updated April 29, 2015, 11:58 p.m.)


Review request for kafka.


Bugs: KAFKA-1660
    https://issues.apache.org/jira/browse/KAFKA-1660


Repository: kafka


Description (updated)
-------

A minor fix.


Incorporated Guozhang's comments.


Modify according to the latest conclusion.


Patch for the finally passed KIP-15git status


Addressed Joel and Guozhang's comments.


rebased on trunk


Rebase on trunk


Addressed Joel's comments.


Addressed Joel's comments


Addressed Jay's comments


Change java doc as Jay suggested.


Go back to the AtomicInteger approach for less dependency.


Rebased on trunk


Diffs (updated)
-----

  clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 42b12928781463b56fc4a45d96bb4da2745b6d95 
  clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 6913090af03a455452b0b5c3df78f266126b3854 
  clients/src/main/java/org/apache/kafka/clients/producer/Producer.java 5b3e75ed83a940bc922f9eca10d4008d67aa37c9 
  clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java 49a98838767615dd952da20825f6985698137710 
  clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b2db91ca14bbd17fef5ce85839679144fff3f689 
  clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java fee322fa0dd9704374db4a6964246a7d2918d3e4 
  clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java c44054038066f0d0829d05f082b2ee42b34cded7 
  clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java baa48e7c1b7ac5da8f3aca29f653c3fff88f8009 
  core/src/test/scala/integration/kafka/api/ProducerSendTest.scala 9811a2b2b1e9bf1beb301138f7626e12d275a8db 

Diff: https://reviews.apache.org/r/31850/diff/


Testing
-------

Unit tests passed.


Thanks,

Jiangjie Qin


Re: Review Request 31850: Patch for KAFKA-1660

Posted by Jiangjie Qin <be...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31850/
-----------------------------------------------------------

(Updated April 21, 2015, 12:38 a.m.)


Review request for kafka.


Bugs: KAFKA-1660
    https://issues.apache.org/jira/browse/KAFKA-1660


Repository: kafka


Description (updated)
-------

A minor fix.


Incorporated Guozhang's comments.


Modify according to the latest conclusion.


Patch for the finally passed KIP-15git status


Addressed Joel and Guozhang's comments.


rebased on trunk


Rebase on trunk


Addressed Joel's comments.


Addressed Joel's comments


Addressed Jay's comments


Changed javadoc per Jay's suggestion


Change java doc as Jay suggested.


Diffs (updated)
-----

  clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b70e1a3d406338d4b9ddd6188d2820e87545a9b6 
  clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 6913090af03a455452b0b5c3df78f266126b3854 
  clients/src/main/java/org/apache/kafka/clients/producer/Producer.java 5b3e75ed83a940bc922f9eca10d4008d67aa37c9 
  clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java 0e7ab29a07301309aa8ca5b75b32b8e05ddc3a94 
  clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 70954cadb5c7e9a4c326afcf9d9a07db230e7db2 
  clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java fee322fa0dd9704374db4a6964246a7d2918d3e4 
  clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java c2fdc23239bd2196cd912c3d121b591f21393eab 
  clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java 05e2929c0a9fc8cf2a8ebe0776ca62d5f3962d5c 
  core/src/test/scala/integration/kafka/api/ProducerSendTest.scala 9811a2b2b1e9bf1beb301138f7626e12d275a8db 

Diff: https://reviews.apache.org/r/31850/diff/


Testing
-------

Unit tests passed.


Thanks,

Jiangjie Qin


Re: Review Request 31850: Patch for KAFKA-1660

Posted by Jiangjie Qin <be...@gmail.com>.

> On April 20, 2015, 5:30 p.m., Jay Kreps wrote:
> > clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java, line 157
> > <https://reviews.apache.org/r/31850/diff/9/?file=931821#file931821line157>
> >
> >     Read locks are very expensive. I am pretty worried about this. If we want to do this we need to do a pretty detailed examination of the perf impact.
> 
> Jiangjie Qin wrote:
>     Hi Jay, I looked into the ReentrantReaderWriterLock implementation and it seems under the hood it uses CompareAndSet which should provide similar performance as atomic integer. But I agree this definitely largely depends on implementation.
>     I modified o.a.k.clients.tools.ProducerPerformance a little bit to make it multiple threaded. The performance in following tests settings are very similar which are all ~1M messages/second when target is 10M message/sec.
>     1. 10 thread with latest trunk
>     2. 10 threads using atomic integer AtomicInteger
>     3. 10 threads using ReaderWriterLock
>     When I increase the thread number to 50. It drops to about 0.82M messages/second in all cases.
>     It seems reader lock did not introduce performance issue.

Hey Jay, do you have any other performance tests in mind that we should run?


- Jiangjie


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31850/#review80753
-----------------------------------------------------------


On April 21, 2015, 12:38 a.m., Jiangjie Qin wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/31850/
> -----------------------------------------------------------
> 
> (Updated April 21, 2015, 12:38 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1660
>     https://issues.apache.org/jira/browse/KAFKA-1660
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> A minor fix.
> 
> 
> Incorporated Guozhang's comments.
> 
> 
> Modify according to the latest conclusion.
> 
> 
> Patch for the finally passed KIP-15git status
> 
> 
> Addressed Joel and Guozhang's comments.
> 
> 
> rebased on trunk
> 
> 
> Rebase on trunk
> 
> 
> Addressed Joel's comments.
> 
> 
> Addressed Joel's comments
> 
> 
> Addressed Jay's comments
> 
> 
> Changed javadoc per Jay's suggestion
> 
> 
> Change java doc as Jay suggested.
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b70e1a3d406338d4b9ddd6188d2820e87545a9b6 
>   clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 6913090af03a455452b0b5c3df78f266126b3854 
>   clients/src/main/java/org/apache/kafka/clients/producer/Producer.java 5b3e75ed83a940bc922f9eca10d4008d67aa37c9 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java 0e7ab29a07301309aa8ca5b75b32b8e05ddc3a94 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 70954cadb5c7e9a4c326afcf9d9a07db230e7db2 
>   clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java fee322fa0dd9704374db4a6964246a7d2918d3e4 
>   clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java c2fdc23239bd2196cd912c3d121b591f21393eab 
>   clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java 05e2929c0a9fc8cf2a8ebe0776ca62d5f3962d5c 
>   core/src/test/scala/integration/kafka/api/ProducerSendTest.scala 9811a2b2b1e9bf1beb301138f7626e12d275a8db 
> 
> Diff: https://reviews.apache.org/r/31850/diff/
> 
> 
> Testing
> -------
> 
> Unit tests passed.
> 
> 
> Thanks,
> 
> Jiangjie Qin
> 
>


Re: Review Request 31850: Patch for KAFKA-1660

Posted by Jiangjie Qin <be...@gmail.com>.

> On April 20, 2015, 5:30 p.m., Jay Kreps wrote:
> > clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java, line 526
> > <https://reviews.apache.org/r/31850/diff/9/?file=931818#file931818line526>
> >
> >     I cleaned up this javadoc a little bit to try to simplify things. With docs it is always hard to get the level of focus right so that it has the essential information but doesn't overload the user and obscure the primary thing.
> >     
> >     1. Changed "messages" to "records" (that is the terminology in the new clients).
> >     2. I significantly shortened the section on calling from within a callback. I think only 0.00001% of people would ever consider this.
> >     3. I cannot think of a reason why the user would care if the I/O thread is synchronously shutdown or not, and we don't make any promises one way or the other in the main close method, so let's just leave that bit out.
> >     
> >     What do you think of this:
> >        /**
> >          * This method waits up to <code>timeout</code> for the producer to complete the sending of all incomplete requests.
> >          * <p>
> >          * If the producer is unable to complete all requests before the timeout expires, this method will fail 
> >     	 * any unsent and unacknowledged records immediately.
> >          * <p>
> >          * If invoked from within a {@link Callback} this method will not block and will be equivalent to <code>close(0, TimeUnit.MILLISECONDS)</code>. This is done since no further sending will happen while blocking the I/O thread of the producer.
> >     	 * 
> >          * @param timeout The maximum time to wait for producer to complete any pending requests. The value should be
> >          *                non-negative. Specifying a timeout of zero means do not wait for pending send requests to complete.
> >          * @param timeUnit The time unit for the <code>timeout</code>
> >          * @throws InterruptException If the thread is interrupted while blocked
> >          * @throws IllegalArgumentException If the <code>timeout</code> is negative.
> >          */

Looks good and it is easier to understand from user point of view. Thanks for cleaning this up.


> On April 20, 2015, 5:30 p.m., Jay Kreps wrote:
> > clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java, line 157
> > <https://reviews.apache.org/r/31850/diff/9/?file=931821#file931821line157>
> >
> >     Read locks are very expensive. I am pretty worried about this. If we want to do this we need to do a pretty detailed examination of the perf impact.

Hi Jay, I looked into the ReentrantReaderWriterLock implementation and it seems under the hood it uses CompareAndSet which should provide similar performance as atomic integer. But I agree this definitely largely depends on implementation.
I modified o.a.k.clients.tools.ProducerPerformance a little bit to make it multiple threaded. The performance in following tests settings are very similar which are all ~1M messages/second when target is 10M message/sec.
1. 10 thread with latest trunk
2. 10 threads using atomic integer AtomicInteger
3. 10 threads using ReaderWriterLock
When I increase the thread number to 50. It drops to about 0.82M messages/second in all cases.
It seems reader lock did not introduce performance issue.


- Jiangjie


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31850/#review80753
-----------------------------------------------------------


On April 16, 2015, 6:35 p.m., Jiangjie Qin wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/31850/
> -----------------------------------------------------------
> 
> (Updated April 16, 2015, 6:35 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1660
>     https://issues.apache.org/jira/browse/KAFKA-1660
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> A minor fix.
> 
> 
> Incorporated Guozhang's comments.
> 
> 
> Modify according to the latest conclusion.
> 
> 
> Patch for the finally passed KIP-15git status
> 
> 
> Addressed Joel and Guozhang's comments.
> 
> 
> rebased on trunk
> 
> 
> Rebase on trunk
> 
> 
> Addressed Joel's comments.
> 
> 
> Addressed Joel's comments
> 
> 
> Addressed Jay's comments
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b91e2c52ed0acb1faa85915097d97bafa28c413a 
>   clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 6913090af03a455452b0b5c3df78f266126b3854 
>   clients/src/main/java/org/apache/kafka/clients/producer/Producer.java 5b3e75ed83a940bc922f9eca10d4008d67aa37c9 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java 0e7ab29a07301309aa8ca5b75b32b8e05ddc3a94 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 70954cadb5c7e9a4c326afcf9d9a07db230e7db2 
>   clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java fee322fa0dd9704374db4a6964246a7d2918d3e4 
>   clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java c2fdc23239bd2196cd912c3d121b591f21393eab 
>   clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java 05e2929c0a9fc8cf2a8ebe0776ca62d5f3962d5c 
>   core/src/test/scala/integration/kafka/api/ProducerSendTest.scala 9811a2b2b1e9bf1beb301138f7626e12d275a8db 
> 
> Diff: https://reviews.apache.org/r/31850/diff/
> 
> 
> Testing
> -------
> 
> Unit tests passed.
> 
> 
> Thanks,
> 
> Jiangjie Qin
> 
>


Re: Review Request 31850: Patch for KAFKA-1660

Posted by Jay Kreps <bo...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31850/#review80753
-----------------------------------------------------------



clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
<https://reviews.apache.org/r/31850/#comment130866>

    I cleaned up this javadoc a little bit to try to simplify things. With docs it is always hard to get the level of focus right so that it has the essential information but doesn't overload the user and obscure the primary thing.
    
    1. Changed "messages" to "records" (that is the terminology in the new clients).
    2. I significantly shortened the section on calling from within a callback. I think only 0.00001% of people would ever consider this.
    3. I cannot think of a reason why the user would care if the I/O thread is synchronously shutdown or not, and we don't make any promises one way or the other in the main close method, so let's just leave that bit out.
    
    What do you think of this:
       /**
         * This method waits up to <code>timeout</code> for the producer to complete the sending of all incomplete requests.
         * <p>
         * If the producer is unable to complete all requests before the timeout expires, this method will fail 
    	 * any unsent and unacknowledged records immediately.
         * <p>
         * If invoked from within a {@link Callback} this method will not block and will be equivalent to <code>close(0, TimeUnit.MILLISECONDS)</code>. This is done since no further sending will happen while blocking the I/O thread of the producer.
    	 * 
         * @param timeout The maximum time to wait for producer to complete any pending requests. The value should be
         *                non-negative. Specifying a timeout of zero means do not wait for pending send requests to complete.
         * @param timeUnit The time unit for the <code>timeout</code>
         * @throws InterruptException If the thread is interrupted while blocked
         * @throws IllegalArgumentException If the <code>timeout</code> is negative.
         */



clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
<https://reviews.apache.org/r/31850/#comment130868>

    Read locks are very expensive. I am pretty worried about this. If we want to do this we need to do a pretty detailed examination of the perf impact.


- Jay Kreps


On April 16, 2015, 6:35 p.m., Jiangjie Qin wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/31850/
> -----------------------------------------------------------
> 
> (Updated April 16, 2015, 6:35 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1660
>     https://issues.apache.org/jira/browse/KAFKA-1660
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> A minor fix.
> 
> 
> Incorporated Guozhang's comments.
> 
> 
> Modify according to the latest conclusion.
> 
> 
> Patch for the finally passed KIP-15git status
> 
> 
> Addressed Joel and Guozhang's comments.
> 
> 
> rebased on trunk
> 
> 
> Rebase on trunk
> 
> 
> Addressed Joel's comments.
> 
> 
> Addressed Joel's comments
> 
> 
> Addressed Jay's comments
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b91e2c52ed0acb1faa85915097d97bafa28c413a 
>   clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 6913090af03a455452b0b5c3df78f266126b3854 
>   clients/src/main/java/org/apache/kafka/clients/producer/Producer.java 5b3e75ed83a940bc922f9eca10d4008d67aa37c9 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java 0e7ab29a07301309aa8ca5b75b32b8e05ddc3a94 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 70954cadb5c7e9a4c326afcf9d9a07db230e7db2 
>   clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java fee322fa0dd9704374db4a6964246a7d2918d3e4 
>   clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java c2fdc23239bd2196cd912c3d121b591f21393eab 
>   clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java 05e2929c0a9fc8cf2a8ebe0776ca62d5f3962d5c 
>   core/src/test/scala/integration/kafka/api/ProducerSendTest.scala 9811a2b2b1e9bf1beb301138f7626e12d275a8db 
> 
> Diff: https://reviews.apache.org/r/31850/diff/
> 
> 
> Testing
> -------
> 
> Unit tests passed.
> 
> 
> Thanks,
> 
> Jiangjie Qin
> 
>


Re: Review Request 31850: Patch for KAFKA-1660

Posted by Jiangjie Qin <be...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31850/
-----------------------------------------------------------

(Updated April 16, 2015, 6:35 p.m.)


Review request for kafka.


Bugs: KAFKA-1660
    https://issues.apache.org/jira/browse/KAFKA-1660


Repository: kafka


Description (updated)
-------

A minor fix.


Incorporated Guozhang's comments.


Modify according to the latest conclusion.


Patch for the finally passed KIP-15git status


Addressed Joel and Guozhang's comments.


rebased on trunk


Rebase on trunk


Addressed Joel's comments.


Addressed Joel's comments


Addressed Jay's comments


Diffs (updated)
-----

  clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b91e2c52ed0acb1faa85915097d97bafa28c413a 
  clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 6913090af03a455452b0b5c3df78f266126b3854 
  clients/src/main/java/org/apache/kafka/clients/producer/Producer.java 5b3e75ed83a940bc922f9eca10d4008d67aa37c9 
  clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java 0e7ab29a07301309aa8ca5b75b32b8e05ddc3a94 
  clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 70954cadb5c7e9a4c326afcf9d9a07db230e7db2 
  clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java fee322fa0dd9704374db4a6964246a7d2918d3e4 
  clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java c2fdc23239bd2196cd912c3d121b591f21393eab 
  clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java 05e2929c0a9fc8cf2a8ebe0776ca62d5f3962d5c 
  core/src/test/scala/integration/kafka/api/ProducerSendTest.scala 9811a2b2b1e9bf1beb301138f7626e12d275a8db 

Diff: https://reviews.apache.org/r/31850/diff/


Testing
-------

Unit tests passed.


Thanks,

Jiangjie Qin


Re: Review Request 31850: Patch for KAFKA-1660

Posted by Jiangjie Qin <be...@gmail.com>.

> On April 11, 2015, 8:02 p.m., Jay Kreps wrote:
> >

Thanks for the review, Jay. Please see the reply below.


> On April 11, 2015, 8:02 p.m., Jay Kreps wrote:
> > clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java, line 530
> > <https://reviews.apache.org/r/31850/diff/8/?file=923924#file923924line530>
> >
> >     I think this section is very confusing. I don't think most people will differentiate between immediately exiting vs waiting for 0 ms and then exiting, since after all isn't waiting 0 ms the same as immediately exiting.

The information we want to deliver here is that when timeout = 0, the behavior would be different depending on the context. i.e. if the method is invoked from user thread, it will try to join sender thread. If it is invoked from sender thread, it won't try join itself - that is what we meant by immediately.


> On April 11, 2015, 8:02 p.m., Jay Kreps wrote:
> > clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java, line 539
> > <https://reviews.apache.org/r/31850/diff/8/?file=923924#file923924line539>
> >
> >     It wouldn't block forever, that isn't correct, it would just block for the period of time they specified.

We are saying we will call close(0) instead of sender thread call close(timeout). And we do this to *avoid* blocking forever.


> On April 11, 2015, 8:02 p.m., Jay Kreps wrote:
> > clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java, line 157
> > <https://reviews.apache.org/r/31850/diff/8/?file=923927#file923927line157>
> >
> >     I think to be correct the check for whether the producer is closed should happen before we consider an append in progress since you loop on that check later.

Yeah, the current solution is based on an assumption that if a thread received IllegalStateException of producer closed, it won't call send() again.
The problem of putting close check before increment appendsInProgress is what if close is invoked from another thread after the close flag check but before incrementing the appendsInProgress value? In this case we might miss this last message or batch.


> On April 11, 2015, 8:02 p.m., Jay Kreps wrote:
> > clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java, line 155
> > <https://reviews.apache.org/r/31850/diff/8/?file=923927#file923927line155>
> >
> >     This scheme is clever but non-obvious, is there a simpler way?

I'm not sure if there is a simpler way. Maybe we can review the current approach again and see if we can simplify them.

The goals we want to achieve here are:
1. When abortImcompleteBatch finishes, no more message should be appended. 
2. Make sure when hasUnsent() return false, it does not miss any batch.

The current solutions for them both depending on setting close flag first.
To achieve (1), the implementation now is setting a close flag first and wait until all on going appends (if any) to finish.
To achieve (2), the implementation synchoronizes on the deque. When an append grabs deque lock, it first check if close flag is set or not. If it is set, that means hasUnsent() might have already checked this deque, so it is not safe to append a new batch anymore. Otherwise it is safe to append a new batch.


> On April 11, 2015, 8:02 p.m., Jay Kreps wrote:
> > clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java, line 176
> > <https://reviews.apache.org/r/31850/diff/8/?file=923927#file923927line176>
> >
> >     Can you explain this check? I don't think this actually fixes things as the close could happen after the check.

Please see previous reply.


- Jiangjie


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31850/#review79813
-----------------------------------------------------------


On April 10, 2015, 10:09 p.m., Jiangjie Qin wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/31850/
> -----------------------------------------------------------
> 
> (Updated April 10, 2015, 10:09 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1660
>     https://issues.apache.org/jira/browse/KAFKA-1660
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> A minor fix.
> 
> 
> Incorporated Guozhang's comments.
> 
> 
> Modify according to the latest conclusion.
> 
> 
> Patch for the finally passed KIP-15git status
> 
> 
> Addressed Joel and Guozhang's comments.
> 
> 
> rebased on trunk
> 
> 
> Rebase on trunk
> 
> 
> Addressed Joel's comments.
> 
> 
> Addressed Joel's comments
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b91e2c52ed0acb1faa85915097d97bafa28c413a 
>   clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 6913090af03a455452b0b5c3df78f266126b3854 
>   clients/src/main/java/org/apache/kafka/clients/producer/Producer.java 5b3e75ed83a940bc922f9eca10d4008d67aa37c9 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java 0e7ab29a07301309aa8ca5b75b32b8e05ddc3a94 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 70954cadb5c7e9a4c326afcf9d9a07db230e7db2 
>   clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java fee322fa0dd9704374db4a6964246a7d2918d3e4 
>   clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java c2fdc23239bd2196cd912c3d121b591f21393eab 
>   clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java 05e2929c0a9fc8cf2a8ebe0776ca62d5f3962d5c 
>   core/src/test/scala/integration/kafka/api/ProducerSendTest.scala 9811a2b2b1e9bf1beb301138f7626e12d275a8db 
> 
> Diff: https://reviews.apache.org/r/31850/diff/
> 
> 
> Testing
> -------
> 
> Unit tests passed.
> 
> 
> Thanks,
> 
> Jiangjie Qin
> 
>


Re: Review Request 31850: Patch for KAFKA-1660

Posted by Jiangjie Qin <be...@gmail.com>.

> On April 11, 2015, 8:02 p.m., Jay Kreps wrote:
> > clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java, line 155
> > <https://reviews.apache.org/r/31850/diff/8/?file=923927#file923927line155>
> >
> >     This scheme is clever but non-obvious, is there a simpler way?
> 
> Jiangjie Qin wrote:
>     I'm not sure if there is a simpler way. Maybe we can review the current approach again and see if we can simplify them.
>     
>     The goals we want to achieve here are:
>     1. When abortImcompleteBatch finishes, no more message should be appended. 
>     2. Make sure when hasUnsent() return false, it does not miss any batch.
>     
>     The current solutions for them both depending on setting close flag first.
>     To achieve (1), the implementation now is setting a close flag first and wait until all on going appends (if any) to finish.
>     To achieve (2), the implementation synchoronizes on the deque. When an append grabs deque lock, it first check if close flag is set or not. If it is set, that means hasUnsent() might have already checked this deque, so it is not safe to append a new batch anymore. Otherwise it is safe to append a new batch.

Thought about this again. And I went back to use the ReaderWriterLock with a small modification using tryLock. Hope it makes things cleaner. The idea is exactly the same as current approach but the code is less confusing. I put some reasoning below, please let me know if you have any suggestions.

Essentially we want to make sure no messages or batches is left behind after 
1. calling abortIncompleteBatches() 
2. hasUnsent() returned false (hasUnsent() only cares about batch, not message)
This means we need to make sure append will not proceed after these two events.

We set close flag before these two events to reject append. And the key issue we want to solve here is how to deal with the appending thread. 

To detect if there are appending threads, we need something either a explicit flag or an exclusive lock (ReaderWriterLock as in previous patch)
If there are appending going on, we have two options here:
A. Fail the append
B. wait until appends are done.

It is a little bit difficult to fail the append because it is difficult to know which step appending is in. hasUnsent() uses dequeue lock to make sure no new batch can be added to a dequeue after it has been checked by hasUnsent(). That's why we need to check the close flag in append after grabbed dequeue lock again.

For abortIncompleteBatches, currently we use option B. One tricky thing here is that a thread might block on buffer full when abortIncompleteBatches is waiting. This would lead to deadlock if the abortIncompleteBatches is called from sender thread as no memory will be released by sender thread and sender thread is waiting for the append which is waiting for memeory. This means we need to keep sender thread running to release memory. To solve this issue, instead of blocking on acquiring writelock, a tryLock is used, and if it returns false, we keep fail batches to release memory until we grab the write lock successfully. Then we abort batches for the last time.

What do you think about this approach?


> On April 11, 2015, 8:02 p.m., Jay Kreps wrote:
> > clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java, line 539
> > <https://reviews.apache.org/r/31850/diff/8/?file=923924#file923924line539>
> >
> >     It wouldn't block forever, that isn't correct, it would just block for the period of time they specified.
> 
> Jiangjie Qin wrote:
>     We are saying we will call close(0) instead of sender thread call close(timeout). And we do this to *avoid* blocking forever.

I removed the comment to avoid confusion.


- Jiangjie


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31850/#review79813
-----------------------------------------------------------


On April 16, 2015, 6:35 p.m., Jiangjie Qin wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/31850/
> -----------------------------------------------------------
> 
> (Updated April 16, 2015, 6:35 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1660
>     https://issues.apache.org/jira/browse/KAFKA-1660
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> A minor fix.
> 
> 
> Incorporated Guozhang's comments.
> 
> 
> Modify according to the latest conclusion.
> 
> 
> Patch for the finally passed KIP-15git status
> 
> 
> Addressed Joel and Guozhang's comments.
> 
> 
> rebased on trunk
> 
> 
> Rebase on trunk
> 
> 
> Addressed Joel's comments.
> 
> 
> Addressed Joel's comments
> 
> 
> Addressed Jay's comments
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b91e2c52ed0acb1faa85915097d97bafa28c413a 
>   clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 6913090af03a455452b0b5c3df78f266126b3854 
>   clients/src/main/java/org/apache/kafka/clients/producer/Producer.java 5b3e75ed83a940bc922f9eca10d4008d67aa37c9 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java 0e7ab29a07301309aa8ca5b75b32b8e05ddc3a94 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 70954cadb5c7e9a4c326afcf9d9a07db230e7db2 
>   clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java fee322fa0dd9704374db4a6964246a7d2918d3e4 
>   clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java c2fdc23239bd2196cd912c3d121b591f21393eab 
>   clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java 05e2929c0a9fc8cf2a8ebe0776ca62d5f3962d5c 
>   core/src/test/scala/integration/kafka/api/ProducerSendTest.scala 9811a2b2b1e9bf1beb301138f7626e12d275a8db 
> 
> Diff: https://reviews.apache.org/r/31850/diff/
> 
> 
> Testing
> -------
> 
> Unit tests passed.
> 
> 
> Thanks,
> 
> Jiangjie Qin
> 
>


Re: Review Request 31850: Patch for KAFKA-1660

Posted by Jay Kreps <bo...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31850/#review79813
-----------------------------------------------------------



clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
<https://reviews.apache.org/r/31850/#comment129396>

    Minor improvement: Use the javadoc reference for callback {@link Callback} to avoid confusion here (there are many kinds of callbacks).



clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
<https://reviews.apache.org/r/31850/#comment129397>

    Minor nit: I think "send request" references our internal class name. I think substituting "request" would be more clear.



clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
<https://reviews.apache.org/r/31850/#comment129398>

    I think this section is very confusing. I don't think most people will differentiate between immediately exiting vs waiting for 0 ms and then exiting, since after all isn't waiting 0 ms the same as immediately exiting.



clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
<https://reviews.apache.org/r/31850/#comment129399>

    It wouldn't block forever, that isn't correct, it would just block for the period of time they specified.



clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
<https://reviews.apache.org/r/31850/#comment129400>

    This is not a deadlock, just a useless blocking.



clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
<https://reviews.apache.org/r/31850/#comment129401>

    This scheme is clever but non-obvious, is there a simpler way?



clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
<https://reviews.apache.org/r/31850/#comment129402>

    I think to be correct the check for whether the producer is closed should happen before we consider an append in progress since you loop on that check later.



clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
<https://reviews.apache.org/r/31850/#comment129403>

    Can you explain this check? I don't think this actually fixes things as the close could happen after the check.


- Jay Kreps


On April 10, 2015, 10:09 p.m., Jiangjie Qin wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/31850/
> -----------------------------------------------------------
> 
> (Updated April 10, 2015, 10:09 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1660
>     https://issues.apache.org/jira/browse/KAFKA-1660
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> A minor fix.
> 
> 
> Incorporated Guozhang's comments.
> 
> 
> Modify according to the latest conclusion.
> 
> 
> Patch for the finally passed KIP-15git status
> 
> 
> Addressed Joel and Guozhang's comments.
> 
> 
> rebased on trunk
> 
> 
> Rebase on trunk
> 
> 
> Addressed Joel's comments.
> 
> 
> Addressed Joel's comments
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b91e2c52ed0acb1faa85915097d97bafa28c413a 
>   clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 6913090af03a455452b0b5c3df78f266126b3854 
>   clients/src/main/java/org/apache/kafka/clients/producer/Producer.java 5b3e75ed83a940bc922f9eca10d4008d67aa37c9 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java 0e7ab29a07301309aa8ca5b75b32b8e05ddc3a94 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 70954cadb5c7e9a4c326afcf9d9a07db230e7db2 
>   clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java fee322fa0dd9704374db4a6964246a7d2918d3e4 
>   clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java c2fdc23239bd2196cd912c3d121b591f21393eab 
>   clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java 05e2929c0a9fc8cf2a8ebe0776ca62d5f3962d5c 
>   core/src/test/scala/integration/kafka/api/ProducerSendTest.scala 9811a2b2b1e9bf1beb301138f7626e12d275a8db 
> 
> Diff: https://reviews.apache.org/r/31850/diff/
> 
> 
> Testing
> -------
> 
> Unit tests passed.
> 
> 
> Thanks,
> 
> Jiangjie Qin
> 
>


Re: Review Request 31850: Patch for KAFKA-1660

Posted by Guozhang Wang <wa...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31850/#review79761
-----------------------------------------------------------

Ship it!


- Guozhang Wang


On April 10, 2015, 10:09 p.m., Jiangjie Qin wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/31850/
> -----------------------------------------------------------
> 
> (Updated April 10, 2015, 10:09 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1660
>     https://issues.apache.org/jira/browse/KAFKA-1660
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> A minor fix.
> 
> 
> Incorporated Guozhang's comments.
> 
> 
> Modify according to the latest conclusion.
> 
> 
> Patch for the finally passed KIP-15git status
> 
> 
> Addressed Joel and Guozhang's comments.
> 
> 
> rebased on trunk
> 
> 
> Rebase on trunk
> 
> 
> Addressed Joel's comments.
> 
> 
> Addressed Joel's comments
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b91e2c52ed0acb1faa85915097d97bafa28c413a 
>   clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 6913090af03a455452b0b5c3df78f266126b3854 
>   clients/src/main/java/org/apache/kafka/clients/producer/Producer.java 5b3e75ed83a940bc922f9eca10d4008d67aa37c9 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java 0e7ab29a07301309aa8ca5b75b32b8e05ddc3a94 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 70954cadb5c7e9a4c326afcf9d9a07db230e7db2 
>   clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java fee322fa0dd9704374db4a6964246a7d2918d3e4 
>   clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java c2fdc23239bd2196cd912c3d121b591f21393eab 
>   clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java 05e2929c0a9fc8cf2a8ebe0776ca62d5f3962d5c 
>   core/src/test/scala/integration/kafka/api/ProducerSendTest.scala 9811a2b2b1e9bf1beb301138f7626e12d275a8db 
> 
> Diff: https://reviews.apache.org/r/31850/diff/
> 
> 
> Testing
> -------
> 
> Unit tests passed.
> 
> 
> Thanks,
> 
> Jiangjie Qin
> 
>


Re: Review Request 31850: Patch for KAFKA-1660

Posted by Jiangjie Qin <be...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31850/
-----------------------------------------------------------

(Updated April 10, 2015, 10:09 p.m.)


Review request for kafka.


Bugs: KAFKA-1660
    https://issues.apache.org/jira/browse/KAFKA-1660


Repository: kafka


Description (updated)
-------

A minor fix.


Incorporated Guozhang's comments.


Modify according to the latest conclusion.


Patch for the finally passed KIP-15git status


Addressed Joel and Guozhang's comments.


rebased on trunk


Rebase on trunk


Addressed Joel's comments.


Addressed Joel's comments


Diffs (updated)
-----

  clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b91e2c52ed0acb1faa85915097d97bafa28c413a 
  clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 6913090af03a455452b0b5c3df78f266126b3854 
  clients/src/main/java/org/apache/kafka/clients/producer/Producer.java 5b3e75ed83a940bc922f9eca10d4008d67aa37c9 
  clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java 0e7ab29a07301309aa8ca5b75b32b8e05ddc3a94 
  clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 70954cadb5c7e9a4c326afcf9d9a07db230e7db2 
  clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java fee322fa0dd9704374db4a6964246a7d2918d3e4 
  clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java c2fdc23239bd2196cd912c3d121b591f21393eab 
  clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java 05e2929c0a9fc8cf2a8ebe0776ca62d5f3962d5c 
  core/src/test/scala/integration/kafka/api/ProducerSendTest.scala 9811a2b2b1e9bf1beb301138f7626e12d275a8db 

Diff: https://reviews.apache.org/r/31850/diff/


Testing
-------

Unit tests passed.


Thanks,

Jiangjie Qin


Re: Review Request 31850: Patch for KAFKA-1660

Posted by Jiangjie Qin <be...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31850/
-----------------------------------------------------------

(Updated April 8, 2015, 9:01 p.m.)


Review request for kafka.


Bugs: KAFKA-1660
    https://issues.apache.org/jira/browse/KAFKA-1660


Repository: kafka


Description (updated)
-------

A minor fix.


Incorporated Guozhang's comments.


Modify according to the latest conclusion.


Patch for the finally passed KIP-15git status


Addressed Joel and Guozhang's comments.


rebased on trunk


Rebase on trunk


Addressed Joel's comments.


Diffs (updated)
-----

  clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b91e2c52ed0acb1faa85915097d97bafa28c413a 
  clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 6913090af03a455452b0b5c3df78f266126b3854 
  clients/src/main/java/org/apache/kafka/clients/producer/Producer.java 5b3e75ed83a940bc922f9eca10d4008d67aa37c9 
  clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java 0e7ab29a07301309aa8ca5b75b32b8e05ddc3a94 
  clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 70954cadb5c7e9a4c326afcf9d9a07db230e7db2 
  clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java fee322fa0dd9704374db4a6964246a7d2918d3e4 
  clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java c2fdc23239bd2196cd912c3d121b591f21393eab 
  clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java 05e2929c0a9fc8cf2a8ebe0776ca62d5f3962d5c 
  core/src/test/scala/integration/kafka/api/ProducerSendTest.scala 9811a2b2b1e9bf1beb301138f7626e12d275a8db 

Diff: https://reviews.apache.org/r/31850/diff/


Testing
-------

Unit tests passed.


Thanks,

Jiangjie Qin


Re: Review Request 31850: Patch for KAFKA-1660

Posted by Jiangjie Qin <be...@gmail.com>.

> On April 8, 2015, 6:36 p.m., Joel Koshy wrote:
> > clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java, line 153
> > <https://reviews.apache.org/r/31850/diff/6/?file=920399#file920399line153>
> >
> >     Thanks for catching this issue, but can you explain it more clearly in the comment? i.e., "append is atomic to close" does not really make sense and the "last batch is missed" is not fully explained. More importantly, Guozhang found an issue with the locking approach that he can comment on.
> >     
> >     Also, general comment on the approach: it is slightly weird to see the closeLock in the code. I'm wondering if we really need to bother with it. i.e., sure there may be some futures returned to the client, but once close has been called, the client probably should not bother to call future.get. Perhaps that is not a valid assumption if they check request satisfaction in separate threads.

Discussed offline. ReaderWriterLock might cause dead lock between sender thread and user thread when memory is full. Will submit another patch to address this.


- Jiangjie


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31850/#review79379
-----------------------------------------------------------


On April 8, 2015, 1:18 a.m., Jiangjie Qin wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/31850/
> -----------------------------------------------------------
> 
> (Updated April 8, 2015, 1:18 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1660
>     https://issues.apache.org/jira/browse/KAFKA-1660
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> A minor fix.
> 
> 
> Incorporated Guozhang's comments.
> 
> 
> Modify according to the latest conclusion.
> 
> 
> Patch for the finally passed KIP-15git status
> 
> 
> Addressed Joel and Guozhang's comments.
> 
> 
> rebased on trunk
> 
> 
> Rebase on trunk
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b91e2c52ed0acb1faa85915097d97bafa28c413a 
>   clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 6913090af03a455452b0b5c3df78f266126b3854 
>   clients/src/main/java/org/apache/kafka/clients/producer/Producer.java 5b3e75ed83a940bc922f9eca10d4008d67aa37c9 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java 0e7ab29a07301309aa8ca5b75b32b8e05ddc3a94 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 70954cadb5c7e9a4c326afcf9d9a07db230e7db2 
>   clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java fee322fa0dd9704374db4a6964246a7d2918d3e4 
>   clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java c2fdc23239bd2196cd912c3d121b591f21393eab 
>   clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java 05e2929c0a9fc8cf2a8ebe0776ca62d5f3962d5c 
>   core/src/test/scala/integration/kafka/api/ProducerSendTest.scala 9811a2b2b1e9bf1beb301138f7626e12d275a8db 
> 
> Diff: https://reviews.apache.org/r/31850/diff/
> 
> 
> Testing
> -------
> 
> Unit tests passed.
> 
> 
> Thanks,
> 
> Jiangjie Qin
> 
>


Re: Review Request 31850: Patch for KAFKA-1660

Posted by Guozhang Wang <wa...@gmail.com>.

> On April 8, 2015, 6:36 p.m., Joel Koshy wrote:
> > clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java, line 381
> > <https://reviews.apache.org/r/31850/diff/6/?file=920399#file920399line381>
> >
> >     Similar comment as above. Once all accesses of closed are protected by the lock then we should perhaps remove the volatile qualifier.

Actually just learned from the purgatory refactoring patch that in Java read-write lock may not provide memory barrier necessarily? http://www.cs.umd.edu/~pugh/java/memoryModel/DoubleCheckedLocking.html


- Guozhang


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31850/#review79379
-----------------------------------------------------------


On April 8, 2015, 9:01 p.m., Jiangjie Qin wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/31850/
> -----------------------------------------------------------
> 
> (Updated April 8, 2015, 9:01 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1660
>     https://issues.apache.org/jira/browse/KAFKA-1660
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> A minor fix.
> 
> 
> Incorporated Guozhang's comments.
> 
> 
> Modify according to the latest conclusion.
> 
> 
> Patch for the finally passed KIP-15git status
> 
> 
> Addressed Joel and Guozhang's comments.
> 
> 
> rebased on trunk
> 
> 
> Rebase on trunk
> 
> 
> Addressed Joel's comments.
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b91e2c52ed0acb1faa85915097d97bafa28c413a 
>   clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 6913090af03a455452b0b5c3df78f266126b3854 
>   clients/src/main/java/org/apache/kafka/clients/producer/Producer.java 5b3e75ed83a940bc922f9eca10d4008d67aa37c9 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java 0e7ab29a07301309aa8ca5b75b32b8e05ddc3a94 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 70954cadb5c7e9a4c326afcf9d9a07db230e7db2 
>   clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java fee322fa0dd9704374db4a6964246a7d2918d3e4 
>   clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java c2fdc23239bd2196cd912c3d121b591f21393eab 
>   clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java 05e2929c0a9fc8cf2a8ebe0776ca62d5f3962d5c 
>   core/src/test/scala/integration/kafka/api/ProducerSendTest.scala 9811a2b2b1e9bf1beb301138f7626e12d275a8db 
> 
> Diff: https://reviews.apache.org/r/31850/diff/
> 
> 
> Testing
> -------
> 
> Unit tests passed.
> 
> 
> Thanks,
> 
> Jiangjie Qin
> 
>


Re: Review Request 31850: Patch for KAFKA-1660

Posted by Jiangjie Qin <be...@gmail.com>.

> On April 8, 2015, 6:36 p.m., Joel Koshy wrote:
> > clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java, line 381
> > <https://reviews.apache.org/r/31850/diff/6/?file=920399#file920399line381>
> >
> >     Similar comment as above. Once all accesses of closed are protected by the lock then we should perhaps remove the volatile qualifier.
> 
> Guozhang Wang wrote:
>     Actually just learned from the purgatory refactoring patch that in Java read-write lock may not provide memory barrier necessarily? http://www.cs.umd.edu/~pugh/java/memoryModel/DoubleCheckedLocking.html

Interesting problem. But why is this related to ReaderWriterLock? I checked the ReentrantReaderWriterLock implementation, it actually uses CompareAndSet. Did I miss anything?


- Jiangjie


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31850/#review79379
-----------------------------------------------------------


On April 8, 2015, 9:01 p.m., Jiangjie Qin wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/31850/
> -----------------------------------------------------------
> 
> (Updated April 8, 2015, 9:01 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1660
>     https://issues.apache.org/jira/browse/KAFKA-1660
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> A minor fix.
> 
> 
> Incorporated Guozhang's comments.
> 
> 
> Modify according to the latest conclusion.
> 
> 
> Patch for the finally passed KIP-15git status
> 
> 
> Addressed Joel and Guozhang's comments.
> 
> 
> rebased on trunk
> 
> 
> Rebase on trunk
> 
> 
> Addressed Joel's comments.
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b91e2c52ed0acb1faa85915097d97bafa28c413a 
>   clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 6913090af03a455452b0b5c3df78f266126b3854 
>   clients/src/main/java/org/apache/kafka/clients/producer/Producer.java 5b3e75ed83a940bc922f9eca10d4008d67aa37c9 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java 0e7ab29a07301309aa8ca5b75b32b8e05ddc3a94 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 70954cadb5c7e9a4c326afcf9d9a07db230e7db2 
>   clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java fee322fa0dd9704374db4a6964246a7d2918d3e4 
>   clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java c2fdc23239bd2196cd912c3d121b591f21393eab 
>   clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java 05e2929c0a9fc8cf2a8ebe0776ca62d5f3962d5c 
>   core/src/test/scala/integration/kafka/api/ProducerSendTest.scala 9811a2b2b1e9bf1beb301138f7626e12d275a8db 
> 
> Diff: https://reviews.apache.org/r/31850/diff/
> 
> 
> Testing
> -------
> 
> Unit tests passed.
> 
> 
> Thanks,
> 
> Jiangjie Qin
> 
>


Re: Review Request 31850: Patch for KAFKA-1660

Posted by Joel Koshy <jj...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31850/#review79379
-----------------------------------------------------------



clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
<https://reviews.apache.org/r/31850/#comment128657>

    joint -> join



clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
<https://reviews.apache.org/r/31850/#comment128660>

    Just to be clear here, I think we can just say "fail any pending send requests". That is equivalent to closing forcefully. The issue is that after this we try to join the sender thread (if called from user thread) so that is not quite closing forcefully. That is actually a graceful close.



clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
<https://reviews.apache.org/r/31850/#comment128661>

    so then this becomes:
    
    When timeout = 0, this method fails all pending send requests and:
    <ul>
    <li> if the method was invoked from the user thread, it will wait for the sender thread to gracefully exit.</li>
    <li> if the method was invoked from the producer callback, it will return immediately without waiting for the sender thread to exit.</li>



clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
<https://reviews.apache.org/r/31850/#comment128662>

    max -> maximum
    for _the_ producer _to_ complete _any pending_ send requests.
    non negative -> non-negative



clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
<https://reviews.apache.org/r/31850/#comment128664>

    Specifying a timeout of zero means do not wait for pending send requests to complete.



clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
<https://reviews.apache.org/r/31850/#comment128666>

    This should probably be info



clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
<https://reviews.apache.org/r/31850/#comment128665>

    This should probably be moved to the else block.



clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
<https://reviews.apache.org/r/31850/#comment128667>

    Can we make this clearer? e.g., "Proceeding to force close the producer since pending requests could not be completed within timeout {}..."



clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
<https://reviews.apache.org/r/31850/#comment128668>

    Thanks for catching this issue, but can you explain it more clearly in the comment? i.e., "append is atomic to close" does not really make sense and the "last batch is missed" is not fully explained. More importantly, Guozhang found an issue with the locking approach that he can comment on.
    
    Also, general comment on the approach: it is slightly weird to see the closeLock in the code. I'm wondering if we really need to bother with it. i.e., sure there may be some futures returned to the client, but once close has been called, the client probably should not bother to call future.get. Perhaps that is not a valid assumption if they check request satisfaction in separate threads.



clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
<https://reviews.apache.org/r/31850/#comment128671>

    Similar comments as above.
    
    Also, since this is public we should probably still acquire the read lock.



clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
<https://reviews.apache.org/r/31850/#comment128673>

    Similar comment as above. Once all accesses of closed are protected by the lock then we should perhaps remove the volatile qualifier.


- Joel Koshy


On April 8, 2015, 1:18 a.m., Jiangjie Qin wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/31850/
> -----------------------------------------------------------
> 
> (Updated April 8, 2015, 1:18 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1660
>     https://issues.apache.org/jira/browse/KAFKA-1660
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> A minor fix.
> 
> 
> Incorporated Guozhang's comments.
> 
> 
> Modify according to the latest conclusion.
> 
> 
> Patch for the finally passed KIP-15git status
> 
> 
> Addressed Joel and Guozhang's comments.
> 
> 
> rebased on trunk
> 
> 
> Rebase on trunk
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b91e2c52ed0acb1faa85915097d97bafa28c413a 
>   clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 6913090af03a455452b0b5c3df78f266126b3854 
>   clients/src/main/java/org/apache/kafka/clients/producer/Producer.java 5b3e75ed83a940bc922f9eca10d4008d67aa37c9 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java 0e7ab29a07301309aa8ca5b75b32b8e05ddc3a94 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 70954cadb5c7e9a4c326afcf9d9a07db230e7db2 
>   clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java fee322fa0dd9704374db4a6964246a7d2918d3e4 
>   clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java c2fdc23239bd2196cd912c3d121b591f21393eab 
>   clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java 05e2929c0a9fc8cf2a8ebe0776ca62d5f3962d5c 
>   core/src/test/scala/integration/kafka/api/ProducerSendTest.scala 9811a2b2b1e9bf1beb301138f7626e12d275a8db 
> 
> Diff: https://reviews.apache.org/r/31850/diff/
> 
> 
> Testing
> -------
> 
> Unit tests passed.
> 
> 
> Thanks,
> 
> Jiangjie Qin
> 
>


Re: Review Request 31850: Patch for KAFKA-1660

Posted by Jiangjie Qin <be...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31850/
-----------------------------------------------------------

(Updated April 8, 2015, 1:18 a.m.)


Review request for kafka.


Bugs: KAFKA-1660
    https://issues.apache.org/jira/browse/KAFKA-1660


Repository: kafka


Description (updated)
-------

A minor fix.


Incorporated Guozhang's comments.


Modify according to the latest conclusion.


Patch for the finally passed KIP-15git status


Addressed Joel and Guozhang's comments.


rebased on trunk


Rebase on trunk


Diffs (updated)
-----

  clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b91e2c52ed0acb1faa85915097d97bafa28c413a 
  clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 6913090af03a455452b0b5c3df78f266126b3854 
  clients/src/main/java/org/apache/kafka/clients/producer/Producer.java 5b3e75ed83a940bc922f9eca10d4008d67aa37c9 
  clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java 0e7ab29a07301309aa8ca5b75b32b8e05ddc3a94 
  clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 70954cadb5c7e9a4c326afcf9d9a07db230e7db2 
  clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java fee322fa0dd9704374db4a6964246a7d2918d3e4 
  clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java c2fdc23239bd2196cd912c3d121b591f21393eab 
  clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java 05e2929c0a9fc8cf2a8ebe0776ca62d5f3962d5c 
  core/src/test/scala/integration/kafka/api/ProducerSendTest.scala 9811a2b2b1e9bf1beb301138f7626e12d275a8db 

Diff: https://reviews.apache.org/r/31850/diff/


Testing
-------

Unit tests passed.


Thanks,

Jiangjie Qin


Re: Review Request 31850: Patch for KAFKA-1660

Posted by Guozhang Wang <wa...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31850/#review79094
-----------------------------------------------------------



clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
<https://reviews.apache.org/r/31850/#comment128193>

    This comment is not correct, as we will still wait on joining the sender thread.



clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
<https://reviews.apache.org/r/31850/#comment128190>

    Also add [NOTE] for this?


- Guozhang Wang


On March 27, 2015, 11:35 p.m., Jiangjie Qin wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/31850/
> -----------------------------------------------------------
> 
> (Updated March 27, 2015, 11:35 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1660
>     https://issues.apache.org/jira/browse/KAFKA-1660
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> A minor fix.
> 
> 
> Incorporated Guozhang's comments.
> 
> 
> Modify according to the latest conclusion.
> 
> 
> Patch for the finally passed KIP-15git status
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java ab263423ff1d33170effb71acdef3fc501fa072a 
>   clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 6913090af03a455452b0b5c3df78f266126b3854 
>   clients/src/main/java/org/apache/kafka/clients/producer/Producer.java 5b3e75ed83a940bc922f9eca10d4008d67aa37c9 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java 88b4e4fbf3bf6fb6d2f90551a792b95d4cd51c40 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 70954cadb5c7e9a4c326afcf9d9a07db230e7db2 
>   clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java fee322fa0dd9704374db4a6964246a7d2918d3e4 
>   clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java c2fdc23239bd2196cd912c3d121b591f21393eab 
>   clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java e379ac89c9a2fbfe750d6b0dec693b7eabb76204 
>   core/src/test/scala/integration/kafka/api/ProducerSendTest.scala 3df450784592b894008e7507b2737f9bb07f7bd2 
> 
> Diff: https://reviews.apache.org/r/31850/diff/
> 
> 
> Testing
> -------
> 
> Unit tests passed.
> 
> 
> Thanks,
> 
> Jiangjie Qin
> 
>


Re: Review Request 31850: Patch for KAFKA-1660

Posted by Jiangjie Qin <be...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31850/
-----------------------------------------------------------

(Updated March 27, 2015, 11:35 p.m.)


Review request for kafka.


Bugs: KAFKA-1660
    https://issues.apache.org/jira/browse/KAFKA-1660


Repository: kafka


Description (updated)
-------

A minor fix.


Incorporated Guozhang's comments.


Modify according to the latest conclusion.


Patch for the finally passed KIP-15git status


Diffs (updated)
-----

  clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java ab263423ff1d33170effb71acdef3fc501fa072a 
  clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 6913090af03a455452b0b5c3df78f266126b3854 
  clients/src/main/java/org/apache/kafka/clients/producer/Producer.java 5b3e75ed83a940bc922f9eca10d4008d67aa37c9 
  clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java 88b4e4fbf3bf6fb6d2f90551a792b95d4cd51c40 
  clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 70954cadb5c7e9a4c326afcf9d9a07db230e7db2 
  clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java fee322fa0dd9704374db4a6964246a7d2918d3e4 
  clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java c2fdc23239bd2196cd912c3d121b591f21393eab 
  clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java e379ac89c9a2fbfe750d6b0dec693b7eabb76204 
  core/src/test/scala/integration/kafka/api/ProducerSendTest.scala 3df450784592b894008e7507b2737f9bb07f7bd2 

Diff: https://reviews.apache.org/r/31850/diff/


Testing
-------

Unit tests passed.


Thanks,

Jiangjie Qin


Re: Review Request 31850: Patch for KAFKA-1660

Posted by Jiangjie Qin <be...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31850/
-----------------------------------------------------------

(Updated March 25, 2015, 5:55 p.m.)


Review request for kafka.


Bugs: KAFKA-1660
    https://issues.apache.org/jira/browse/KAFKA-1660


Repository: kafka


Description (updated)
-------

A minor fix.


Incorporated Guozhang's comments.


Modify according to the latest conclusion.


Diffs (updated)
-----

  clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java ab263423ff1d33170effb71acdef3fc501fa072a 
  clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 6913090af03a455452b0b5c3df78f266126b3854 
  clients/src/main/java/org/apache/kafka/clients/producer/Producer.java 5b3e75ed83a940bc922f9eca10d4008d67aa37c9 
  clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java 88b4e4fbf3bf6fb6d2f90551a792b95d4cd51c40 
  clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 70954cadb5c7e9a4c326afcf9d9a07db230e7db2 
  clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java fee322fa0dd9704374db4a6964246a7d2918d3e4 
  clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java c2fdc23239bd2196cd912c3d121b591f21393eab 
  clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java e379ac89c9a2fbfe750d6b0dec693b7eabb76204 
  core/src/test/scala/integration/kafka/api/ProducerSendTest.scala 3df450784592b894008e7507b2737f9bb07f7bd2 

Diff: https://reviews.apache.org/r/31850/diff/


Testing
-------

Unit tests passed.


Thanks,

Jiangjie Qin


Re: Review Request 31850: Patch for KAFKA-1660

Posted by Guozhang Wang <wa...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31850/#review75968
-----------------------------------------------------------

Ship it!


LGTM. Just a minor comment on one of your replies.

- Guozhang Wang


On March 9, 2015, 7:56 p.m., Jiangjie Qin wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/31850/
> -----------------------------------------------------------
> 
> (Updated March 9, 2015, 7:56 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1660
>     https://issues.apache.org/jira/browse/KAFKA-1660
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> A minor fix.
> 
> 
> Incorporated Guozhang's comments.
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 7397e565fd865214529ffccadd4222d835ac8110 
>   clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 6913090af03a455452b0b5c3df78f266126b3854 
>   clients/src/main/java/org/apache/kafka/clients/producer/Producer.java 5b3e75ed83a940bc922f9eca10d4008d67aa37c9 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java d5c79e2481d5e9a2524ac2ef6a6879f61cb7cb5f 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java ed9c63a6679e3aaf83d19fde19268553a4c107c2 
>   clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java fee322fa0dd9704374db4a6964246a7d2918d3e4 
>   clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java c2fdc23239bd2196cd912c3d121b591f21393eab 
>   clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java c1bc40648479d4c2ae4ac52f40dadc070a6bcf6f 
>   core/src/test/scala/integration/kafka/api/ProducerSendTest.scala 3df450784592b894008e7507b2737f9bb07f7bd2 
> 
> Diff: https://reviews.apache.org/r/31850/diff/
> 
> 
> Testing
> -------
> 
> Unit tests passed.
> 
> 
> Thanks,
> 
> Jiangjie Qin
> 
>


Re: Review Request 31850: Patch for KAFKA-1660

Posted by Jiangjie Qin <be...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31850/#review75742
-----------------------------------------------------------



clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
<https://reviews.apache.org/r/31850/#comment122980>

    I think so. It is synchronized underlying and idempotent from what I can see.



clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
<https://reviews.apache.org/r/31850/#comment122994>

    Thought about this again. Actually we should not invoke initiateClose in producer.close(timeout) when timeout is set to negative. I'll change the code in close(timeout) and leave this code as is. It is also more clear because from the semantic point of view, forceClose should do everything.



core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
<https://reviews.apache.org/r/31850/#comment122997>

    Probably we cannot, because in this case the producer got closed for each iteration. So we have to create a new one.


- Jiangjie Qin


On March 9, 2015, 7:56 p.m., Jiangjie Qin wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/31850/
> -----------------------------------------------------------
> 
> (Updated March 9, 2015, 7:56 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1660
>     https://issues.apache.org/jira/browse/KAFKA-1660
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> A minor fix.
> 
> 
> Incorporated Guozhang's comments.
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 7397e565fd865214529ffccadd4222d835ac8110 
>   clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 6913090af03a455452b0b5c3df78f266126b3854 
>   clients/src/main/java/org/apache/kafka/clients/producer/Producer.java 5b3e75ed83a940bc922f9eca10d4008d67aa37c9 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java d5c79e2481d5e9a2524ac2ef6a6879f61cb7cb5f 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java ed9c63a6679e3aaf83d19fde19268553a4c107c2 
>   clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java fee322fa0dd9704374db4a6964246a7d2918d3e4 
>   clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java c2fdc23239bd2196cd912c3d121b591f21393eab 
>   clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java c1bc40648479d4c2ae4ac52f40dadc070a6bcf6f 
>   core/src/test/scala/integration/kafka/api/ProducerSendTest.scala 3df450784592b894008e7507b2737f9bb07f7bd2 
> 
> Diff: https://reviews.apache.org/r/31850/diff/
> 
> 
> Testing
> -------
> 
> Unit tests passed.
> 
> 
> Thanks,
> 
> Jiangjie Qin
> 
>


Re: Review Request 31850: Patch for KAFKA-1660

Posted by Jiangjie Qin <be...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31850/
-----------------------------------------------------------

(Updated March 9, 2015, 7:56 p.m.)


Review request for kafka.


Bugs: KAFKA-1660
    https://issues.apache.org/jira/browse/KAFKA-1660


Repository: kafka


Description (updated)
-------

A minor fix.


Incorporated Guozhang's comments.


Diffs (updated)
-----

  clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 7397e565fd865214529ffccadd4222d835ac8110 
  clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 6913090af03a455452b0b5c3df78f266126b3854 
  clients/src/main/java/org/apache/kafka/clients/producer/Producer.java 5b3e75ed83a940bc922f9eca10d4008d67aa37c9 
  clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java d5c79e2481d5e9a2524ac2ef6a6879f61cb7cb5f 
  clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java ed9c63a6679e3aaf83d19fde19268553a4c107c2 
  clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java fee322fa0dd9704374db4a6964246a7d2918d3e4 
  clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java c2fdc23239bd2196cd912c3d121b591f21393eab 
  clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java c1bc40648479d4c2ae4ac52f40dadc070a6bcf6f 
  core/src/test/scala/integration/kafka/api/ProducerSendTest.scala 3df450784592b894008e7507b2737f9bb07f7bd2 

Diff: https://reviews.apache.org/r/31850/diff/


Testing
-------

Unit tests passed.


Thanks,

Jiangjie Qin


Re: Review Request 31850: Patch for KAFKA-1660

Posted by Guozhang Wang <wa...@gmail.com>.

> On March 9, 2015, 6:37 p.m., Guozhang Wang wrote:
> > clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java, line 418
> > <https://reviews.apache.org/r/31850/diff/2/?file=888886#file888886line418>
> >
> >     This is not related to this ticket, but I think we can just throw e here as well.
> 
> Jiangjie Qin wrote:
>     The InterruptException is a RuntimeException wrapper around InterruptedException (a little bit confusing...). I think it was intentionally thrown InterruptException instead of InterruptedException here.

This is very confusing, I am wondering why we have to throw a checked exception here. Anyways, if we are stick with InterruptException I think it is better to just throw the exception with e.message if it is null, without wrapping the original exception.


- Guozhang


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31850/#review75731
-----------------------------------------------------------


On March 9, 2015, 7:56 p.m., Jiangjie Qin wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/31850/
> -----------------------------------------------------------
> 
> (Updated March 9, 2015, 7:56 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1660
>     https://issues.apache.org/jira/browse/KAFKA-1660
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> A minor fix.
> 
> 
> Incorporated Guozhang's comments.
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 7397e565fd865214529ffccadd4222d835ac8110 
>   clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 6913090af03a455452b0b5c3df78f266126b3854 
>   clients/src/main/java/org/apache/kafka/clients/producer/Producer.java 5b3e75ed83a940bc922f9eca10d4008d67aa37c9 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java d5c79e2481d5e9a2524ac2ef6a6879f61cb7cb5f 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java ed9c63a6679e3aaf83d19fde19268553a4c107c2 
>   clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java fee322fa0dd9704374db4a6964246a7d2918d3e4 
>   clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java c2fdc23239bd2196cd912c3d121b591f21393eab 
>   clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java c1bc40648479d4c2ae4ac52f40dadc070a6bcf6f 
>   core/src/test/scala/integration/kafka/api/ProducerSendTest.scala 3df450784592b894008e7507b2737f9bb07f7bd2 
> 
> Diff: https://reviews.apache.org/r/31850/diff/
> 
> 
> Testing
> -------
> 
> Unit tests passed.
> 
> 
> Thanks,
> 
> Jiangjie Qin
> 
>


Re: Review Request 31850: Patch for KAFKA-1660

Posted by Jiangjie Qin <be...@gmail.com>.

> On March 9, 2015, 6:37 p.m., Guozhang Wang wrote:
> > clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java, line 565
> > <https://reviews.apache.org/r/31850/diff/2/?file=888886#file888886line565>
> >
> >     Could metrics.close() be called simultaneously?

I think so. It is synchronized underlying and idempotent from what I can see.


> On March 9, 2015, 6:37 p.m., Guozhang Wang wrote:
> > clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java, line 418
> > <https://reviews.apache.org/r/31850/diff/2/?file=888886#file888886line418>
> >
> >     This is not related to this ticket, but I think we can just throw e here as well.

The InterruptException is a RuntimeException wrapper around InterruptedException (a little bit confusing...). I think it was intentionally thrown InterruptException instead of InterruptedException here.


> On March 9, 2015, 6:37 p.m., Guozhang Wang wrote:
> > clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java, line 224
> > <https://reviews.apache.org/r/31850/diff/2/?file=888890#file888890line224>
> >
> >     I think you only need to call this.wakeup() here?

Thought about this again. Actually we should not invoke initiateClose in producer.close(timeout) when timeout is set to negative. I'll change the code in close(timeout) and leave this code as is. It is also more clear because from the semantic point of view, forceClose should do everything.


> On March 9, 2015, 6:37 p.m., Guozhang Wang wrote:
> > core/src/test/scala/integration/kafka/api/ProducerSendTest.scala, line 346
> > <https://reviews.apache.org/r/31850/diff/2/?file=888893#file888893line346>
> >
> >     Could we reuse the producer in the ProducerSendTest class?

Probably we cannot, because in this case the producer got closed for each iteration. So we have to create a new one.


- Jiangjie


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31850/#review75731
-----------------------------------------------------------


On March 9, 2015, 7:56 p.m., Jiangjie Qin wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/31850/
> -----------------------------------------------------------
> 
> (Updated March 9, 2015, 7:56 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1660
>     https://issues.apache.org/jira/browse/KAFKA-1660
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> A minor fix.
> 
> 
> Incorporated Guozhang's comments.
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 7397e565fd865214529ffccadd4222d835ac8110 
>   clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 6913090af03a455452b0b5c3df78f266126b3854 
>   clients/src/main/java/org/apache/kafka/clients/producer/Producer.java 5b3e75ed83a940bc922f9eca10d4008d67aa37c9 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java d5c79e2481d5e9a2524ac2ef6a6879f61cb7cb5f 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java ed9c63a6679e3aaf83d19fde19268553a4c107c2 
>   clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java fee322fa0dd9704374db4a6964246a7d2918d3e4 
>   clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java c2fdc23239bd2196cd912c3d121b591f21393eab 
>   clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java c1bc40648479d4c2ae4ac52f40dadc070a6bcf6f 
>   core/src/test/scala/integration/kafka/api/ProducerSendTest.scala 3df450784592b894008e7507b2737f9bb07f7bd2 
> 
> Diff: https://reviews.apache.org/r/31850/diff/
> 
> 
> Testing
> -------
> 
> Unit tests passed.
> 
> 
> Thanks,
> 
> Jiangjie Qin
> 
>


Re: Review Request 31850: Patch for KAFKA-1660

Posted by Guozhang Wang <wa...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31850/#review75731
-----------------------------------------------------------



clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
<https://reviews.apache.org/r/31850/#comment122960>

    This is not related to this ticket, but I think we can just throw e here as well.



clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
<https://reviews.apache.org/r/31850/#comment122959>

    This function no longer throws InterruptException, but only KafkaException. Better change line 539 to just throw e.



clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
<https://reviews.apache.org/r/31850/#comment122966>

    Could metrics.close() be called simultaneously?



clients/src/main/java/org/apache/kafka/clients/producer/Producer.java
<https://reviews.apache.org/r/31850/#comment122961>

    Space after ","



clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
<https://reviews.apache.org/r/31850/#comment122964>

    How about rename to abortIncompleteBatches and return batch.done(-1, new InterruptException(""))



clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
<https://reviews.apache.org/r/31850/#comment122965>

    I think you only need to call this.wakeup() here?



clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
<https://reviews.apache.org/r/31850/#comment122967>

    Rename to testAbortIncompleteBatches?



clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
<https://reviews.apache.org/r/31850/#comment122968>

    Could we move this object as a variable of the class, since it is created in each one of the test cases?



core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
<https://reviews.apache.org/r/31850/#comment122970>

    Could we reuse the producer in the ProducerSendTest class?



core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
<https://reviews.apache.org/r/31850/#comment122973>

    Could we extract this to a separate test case?



core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
<https://reviews.apache.org/r/31850/#comment122971>

    "i <- 0 until 50"?


- Guozhang Wang


On March 9, 2015, 4:14 a.m., Jiangjie Qin wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/31850/
> -----------------------------------------------------------
> 
> (Updated March 9, 2015, 4:14 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1660
>     https://issues.apache.org/jira/browse/KAFKA-1660
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> A minor fix.
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 7397e565fd865214529ffccadd4222d835ac8110 
>   clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 6913090af03a455452b0b5c3df78f266126b3854 
>   clients/src/main/java/org/apache/kafka/clients/producer/Producer.java 5b3e75ed83a940bc922f9eca10d4008d67aa37c9 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java d5c79e2481d5e9a2524ac2ef6a6879f61cb7cb5f 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java ed9c63a6679e3aaf83d19fde19268553a4c107c2 
>   clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java c2fdc23239bd2196cd912c3d121b591f21393eab 
>   clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java c1bc40648479d4c2ae4ac52f40dadc070a6bcf6f 
>   core/src/test/scala/integration/kafka/api/ProducerSendTest.scala 3df450784592b894008e7507b2737f9bb07f7bd2 
> 
> Diff: https://reviews.apache.org/r/31850/diff/
> 
> 
> Testing
> -------
> 
> Unit tests passed.
> 
> 
> Thanks,
> 
> Jiangjie Qin
> 
>


Re: Review Request 31850: Patch for KAFKA-1660

Posted by Jiangjie Qin <be...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31850/
-----------------------------------------------------------

(Updated March 9, 2015, 4:14 a.m.)


Review request for kafka.


Bugs: KAFKA-1660
    https://issues.apache.org/jira/browse/KAFKA-1660


Repository: kafka


Description (updated)
-------

A minor fix.


Diffs (updated)
-----

  clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 7397e565fd865214529ffccadd4222d835ac8110 
  clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 6913090af03a455452b0b5c3df78f266126b3854 
  clients/src/main/java/org/apache/kafka/clients/producer/Producer.java 5b3e75ed83a940bc922f9eca10d4008d67aa37c9 
  clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java d5c79e2481d5e9a2524ac2ef6a6879f61cb7cb5f 
  clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java ed9c63a6679e3aaf83d19fde19268553a4c107c2 
  clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java c2fdc23239bd2196cd912c3d121b591f21393eab 
  clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java c1bc40648479d4c2ae4ac52f40dadc070a6bcf6f 
  core/src/test/scala/integration/kafka/api/ProducerSendTest.scala 3df450784592b894008e7507b2737f9bb07f7bd2 

Diff: https://reviews.apache.org/r/31850/diff/


Testing
-------

Unit tests passed.


Thanks,

Jiangjie Qin


Re: Review Request 31850: Patch for KAFKA-1660

Posted by Jiangjie Qin <be...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31850/
-----------------------------------------------------------

(Updated March 9, 2015, 4:07 a.m.)


Review request for kafka.


Bugs: KAFKA-1660
    https://issues.apache.org/jira/browse/KAFKA-1660


Repository: kafka


Description (updated)
-------

Patch for KAFKA-1660 add a close method with timeout to producer. Added unit tests.


Diffs
-----

  clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 7397e565fd865214529ffccadd4222d835ac8110 
  clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 6913090af03a455452b0b5c3df78f266126b3854 
  clients/src/main/java/org/apache/kafka/clients/producer/Producer.java 5b3e75ed83a940bc922f9eca10d4008d67aa37c9 
  clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java d5c79e2481d5e9a2524ac2ef6a6879f61cb7cb5f 
  clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java ed9c63a6679e3aaf83d19fde19268553a4c107c2 
  clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java c2fdc23239bd2196cd912c3d121b591f21393eab 
  clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java c1bc40648479d4c2ae4ac52f40dadc070a6bcf6f 
  core/src/test/scala/integration/kafka/api/ProducerSendTest.scala 3df450784592b894008e7507b2737f9bb07f7bd2 

Diff: https://reviews.apache.org/r/31850/diff/


Testing (updated)
-------

Unit tests passed.


Thanks,

Jiangjie Qin