You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Jay Kreps <bo...@gmail.com> on 2015/02/07 21:59:52 UTC

Review Request 30763: Patch for KAFKA-1865

-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30763/
-----------------------------------------------------------

Review request for kafka.


Bugs: KAFKA-1865
    https://issues.apache.org/jira/browse/KAFKA-1865


Repository: kafka


Description
-------

KAFKA-1865 Add a flush() method to the producer.


Diffs
-----

  clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 1fd6917c8a5131254c740abad7f7228a47e3628c 
  clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 84530f2b948f9abd74203db48707e490dd9c81a5 
  clients/src/main/java/org/apache/kafka/clients/producer/Producer.java 17fe541588d462c68c33f6209717cc4015e9b62f 
  clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java ecfe2144d778a5d9b614df5278b9f0a15637f10b 
  clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java dd0af8aee98abed5d4a0dc50989e37888bb353fe 
  clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java 75513b0bdd439329c5771d87436ef83fda853bfb 
  clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java 83338633717cfa4ef7cf2a590b5aa6b9c8cb1dd2 
  core/src/test/scala/integration/kafka/api/ProducerSendTest.scala b15237b76def3b234924280fa3fdb25dbb0cc0dc 
  core/src/test/scala/unit/kafka/utils/TestUtils.scala 54755e8dd3f23ced313067566cd4ea867f8a496e 

Diff: https://reviews.apache.org/r/30763/diff/


Testing
-------


Thanks,

Jay Kreps


Re: Review Request 30763: Patch for KAFKA-1865

Posted by Jay Kreps <bo...@gmail.com>.

> On Feb. 8, 2015, 7:04 p.m., Ewen Cheslack-Postava wrote:
> > clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java, line 352
> > <https://reviews.apache.org/r/30763/diff/1/?file=853737#file853737line352>
> >
> >     This isn't properly reset in the case of InterruptionExceptions. This should be in a finally block.

Yeah this is a good point. We have somewhat ignored InterruptedException. I don't like to throw it from public methods because I think it then forces the user to deal with it even if they aren't going to call interrupt which is really annoying.

So how about this, I'll add an InterruptException which can be our unchecked version of InterruptedException. We can reset the interrupt flag and throw that (to differentiate from the generic KafkaException).


- Jay


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30763/#review71573
-----------------------------------------------------------


On Feb. 7, 2015, 8:59 p.m., Jay Kreps wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/30763/
> -----------------------------------------------------------
> 
> (Updated Feb. 7, 2015, 8:59 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1865
>     https://issues.apache.org/jira/browse/KAFKA-1865
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> KAFKA-1865 Add a flush() method to the producer.
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 1fd6917c8a5131254c740abad7f7228a47e3628c 
>   clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 84530f2b948f9abd74203db48707e490dd9c81a5 
>   clients/src/main/java/org/apache/kafka/clients/producer/Producer.java 17fe541588d462c68c33f6209717cc4015e9b62f 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java ecfe2144d778a5d9b614df5278b9f0a15637f10b 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java dd0af8aee98abed5d4a0dc50989e37888bb353fe 
>   clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java 75513b0bdd439329c5771d87436ef83fda853bfb 
>   clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java 83338633717cfa4ef7cf2a590b5aa6b9c8cb1dd2 
>   core/src/test/scala/integration/kafka/api/ProducerSendTest.scala b15237b76def3b234924280fa3fdb25dbb0cc0dc 
>   core/src/test/scala/unit/kafka/utils/TestUtils.scala 54755e8dd3f23ced313067566cd4ea867f8a496e 
> 
> Diff: https://reviews.apache.org/r/30763/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Jay Kreps
> 
>


Re: Review Request 30763: Patch for KAFKA-1865

Posted by Ewen Cheslack-Postava <me...@ewencp.org>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30763/#review71573
-----------------------------------------------------------


Minor issue with cleaning an InterruptionException, but otherwise looks good to me.


clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
<https://reviews.apache.org/r/30763/#comment117336>

    This isn't properly reset in the case of InterruptionExceptions. This should be in a finally block.


- Ewen Cheslack-Postava


On Feb. 7, 2015, 8:59 p.m., Jay Kreps wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/30763/
> -----------------------------------------------------------
> 
> (Updated Feb. 7, 2015, 8:59 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1865
>     https://issues.apache.org/jira/browse/KAFKA-1865
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> KAFKA-1865 Add a flush() method to the producer.
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 1fd6917c8a5131254c740abad7f7228a47e3628c 
>   clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 84530f2b948f9abd74203db48707e490dd9c81a5 
>   clients/src/main/java/org/apache/kafka/clients/producer/Producer.java 17fe541588d462c68c33f6209717cc4015e9b62f 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java ecfe2144d778a5d9b614df5278b9f0a15637f10b 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java dd0af8aee98abed5d4a0dc50989e37888bb353fe 
>   clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java 75513b0bdd439329c5771d87436ef83fda853bfb 
>   clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java 83338633717cfa4ef7cf2a590b5aa6b9c8cb1dd2 
>   core/src/test/scala/integration/kafka/api/ProducerSendTest.scala b15237b76def3b234924280fa3fdb25dbb0cc0dc 
>   core/src/test/scala/unit/kafka/utils/TestUtils.scala 54755e8dd3f23ced313067566cd4ea867f8a496e 
> 
> Diff: https://reviews.apache.org/r/30763/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Jay Kreps
> 
>


Re: Review Request 30763: KAFKA-1865: add flush to producer

Posted by Jay Kreps <bo...@gmail.com>.

> On Feb. 17, 2015, 9:28 p.m., Jiangjie Qin wrote:
> > clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java, line 80
> > <https://reviews.apache.org/r/30763/diff/1/?file=853738#file853738line80>
> >
> >     I think we have to execute the callback before we wake up the caller thread. Otherwise if something went wrong in this batch, caller thread might not be aware of that before it's waken up and put a bunch of other stuff into producer, or commit offsets.
> >     For example, 
> >     In mirror maker:
> >     ...
> >     for (rec <- recs)
> >       producer.send(rec1);
> >     producer.flush();
> >     consumer.commitOffsets();
> >     ...
> >     The caller thread could have already committed offsets even if something went wrong in callback.

This is actually a good point. However we also want to have the same semantics as calling .get() on all the metadata futures. Currently we first unblock the future then execute the callback, I am going to reverse that ordering. So now the future isn't unblocked until all callbacks have been executed and this also gives us the property you described for flush (since flush is equivalent ot calling get()).


- Jay


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30763/#review72794
-----------------------------------------------------------


On Feb. 26, 2015, 6:37 p.m., Jay Kreps wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/30763/
> -----------------------------------------------------------
> 
> (Updated Feb. 26, 2015, 6:37 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1865
>     https://issues.apache.org/jira/browse/KAFKA-1865
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> KAFKA-1865 Add a flush() method to the producer.
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/clients/Metadata.java e8afecda956303a6ee116499fd443a54c018e17d 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 1fd6917c8a5131254c740abad7f7228a47e3628c 
>   clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 84530f2b948f9abd74203db48707e490dd9c81a5 
>   clients/src/main/java/org/apache/kafka/clients/producer/Producer.java 17fe541588d462c68c33f6209717cc4015e9b62f 
>   clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java 4990692efa6f01c62e1d7b05fbf31bec50e398c9 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java 4a2da41f47994f778109e3c4107ffd90195f0bae 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java ecfe2144d778a5d9b614df5278b9f0a15637f10b 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java dd0af8aee98abed5d4a0dc50989e37888bb353fe 
>   clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java d682bd46ec3826f0a72388cc4ec30e1b1223d0f3 
>   clients/src/test/java/org/apache/kafka/clients/producer/BufferPoolTest.java 4ae43ed47e31ad8052b4348a731da11120968508 
>   clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java 743aa7e523dd476949f484bfa4c7fb8a3afd7bf8 
>   clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java 75513b0bdd439329c5771d87436ef83fda853bfb 
>   clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java 404bedb3dc4e44cc79251d71e1e3f8efdab60efa 
>   clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java 83338633717cfa4ef7cf2a590b5aa6b9c8cb1dd2 
>   clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java 558942aaecd1b9f7098435d39aa4b362cd16ff0a 
>   core/src/test/scala/integration/kafka/api/ConsumerTest.scala 2802a399bf599e9530f53b7df72f12702a10d3c4 
>   core/src/test/scala/integration/kafka/api/ProducerSendTest.scala b15237b76def3b234924280fa3fdb25dbb0cc0dc 
>   core/src/test/scala/unit/kafka/utils/TestUtils.scala 21d0ed2cb7c9459261d3cdc7c21dece5e2079698 
> 
> Diff: https://reviews.apache.org/r/30763/diff/
> 
> 
> Testing
> -------
> 
> The latest patch uses Jiangjie's suggestion to remove the synchronization on flush.
> 
> 
> Thanks,
> 
> Jay Kreps
> 
>


Re: Review Request 30763: Patch for KAFKA-1865

Posted by Jiangjie Qin <be...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30763/#review72794
-----------------------------------------------------------



clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
<https://reviews.apache.org/r/30763/#comment118872>

    I think we have to execute the callback before we wake up the caller thread. Otherwise if something went wrong in this batch, caller thread might not be aware of that before it's waken up and put a bunch of other stuff into producer, or commit offsets.
    For example, 
    In mirror maker:
    ...
    for (rec <- recs)
      producer.send(rec1);
    producer.flush();
    consumer.commitOffsets();
    ...
    The caller thread could have already committed offsets even if something went wrong in callback.


- Jiangjie Qin


On Feb. 7, 2015, 8:59 p.m., Jay Kreps wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/30763/
> -----------------------------------------------------------
> 
> (Updated Feb. 7, 2015, 8:59 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1865
>     https://issues.apache.org/jira/browse/KAFKA-1865
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> KAFKA-1865 Add a flush() method to the producer.
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 1fd6917c8a5131254c740abad7f7228a47e3628c 
>   clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 84530f2b948f9abd74203db48707e490dd9c81a5 
>   clients/src/main/java/org/apache/kafka/clients/producer/Producer.java 17fe541588d462c68c33f6209717cc4015e9b62f 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java ecfe2144d778a5d9b614df5278b9f0a15637f10b 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java dd0af8aee98abed5d4a0dc50989e37888bb353fe 
>   clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java 75513b0bdd439329c5771d87436ef83fda853bfb 
>   clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java 83338633717cfa4ef7cf2a590b5aa6b9c8cb1dd2 
>   core/src/test/scala/integration/kafka/api/ProducerSendTest.scala b15237b76def3b234924280fa3fdb25dbb0cc0dc 
>   core/src/test/scala/unit/kafka/utils/TestUtils.scala 54755e8dd3f23ced313067566cd4ea867f8a496e 
> 
> Diff: https://reviews.apache.org/r/30763/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Jay Kreps
> 
>


Re: Review Request 30763: Patch for KAFKA-1865

Posted by Jay Kreps <bo...@gmail.com>.

> On Feb. 22, 2015, 2:46 a.m., Jiangjie Qin wrote:
> > clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java, line 90
> > <https://reviews.apache.org/r/30763/diff/2/?file=871826#file871826line90>
> >
> >     It seems we haven't update the produce result here yet. So the value() call does not have baseOffset and topic partition information.
> >     It looks we have three things to do here in order:
> >     1. set up the result
> >     2. invoke callbacks
> >     3. notify threads waiting on flush.
> >     Currently both 1 and 3 and done in this.produceFuture.done(), maybe we need to separate them?

This is a good catch. Actually though, I found an even bigger problem in this approach (see JIRA). I'll redo this patch.


- Jay


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30763/#review73433
-----------------------------------------------------------


On Feb. 21, 2015, 11:37 p.m., Jay Kreps wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/30763/
> -----------------------------------------------------------
> 
> (Updated Feb. 21, 2015, 11:37 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1865
>     https://issues.apache.org/jira/browse/KAFKA-1865
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> KAFKA-1865 Add a flush() method to the producer.
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/clients/Metadata.java b8cdd145bfcc6633763b25fc9812c49627c8df92 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 1fd6917c8a5131254c740abad7f7228a47e3628c 
>   clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 84530f2b948f9abd74203db48707e490dd9c81a5 
>   clients/src/main/java/org/apache/kafka/clients/producer/Producer.java 17fe541588d462c68c33f6209717cc4015e9b62f 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java 4a2da41f47994f778109e3c4107ffd90195f0bae 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java ecfe2144d778a5d9b614df5278b9f0a15637f10b 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java dd0af8aee98abed5d4a0dc50989e37888bb353fe 
>   clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java d682bd46ec3826f0a72388cc4ec30e1b1223d0f3 
>   clients/src/test/java/org/apache/kafka/clients/producer/BufferPoolTest.java 4ae43ed47e31ad8052b4348a731da11120968508 
>   clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java 743aa7e523dd476949f484bfa4c7fb8a3afd7bf8 
>   clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java 75513b0bdd439329c5771d87436ef83fda853bfb 
>   clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java 83338633717cfa4ef7cf2a590b5aa6b9c8cb1dd2 
>   core/src/test/scala/integration/kafka/api/ProducerSendTest.scala b15237b76def3b234924280fa3fdb25dbb0cc0dc 
>   core/src/test/scala/unit/kafka/utils/TestUtils.scala 21d0ed2cb7c9459261d3cdc7c21dece5e2079698 
> 
> Diff: https://reviews.apache.org/r/30763/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Jay Kreps
> 
>


Re: Review Request 30763: Patch for KAFKA-1865

Posted by Jiangjie Qin <be...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30763/#review73433
-----------------------------------------------------------


Thanks Jay, just one comment below.


clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
<https://reviews.apache.org/r/30763/#comment119772>

    It seems we haven't update the produce result here yet. So the value() call does not have baseOffset and topic partition information.
    It looks we have three things to do here in order:
    1. set up the result
    2. invoke callbacks
    3. notify threads waiting on flush.
    Currently both 1 and 3 and done in this.produceFuture.done(), maybe we need to separate them?


- Jiangjie Qin


On Feb. 21, 2015, 11:37 p.m., Jay Kreps wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/30763/
> -----------------------------------------------------------
> 
> (Updated Feb. 21, 2015, 11:37 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1865
>     https://issues.apache.org/jira/browse/KAFKA-1865
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> KAFKA-1865 Add a flush() method to the producer.
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/clients/Metadata.java b8cdd145bfcc6633763b25fc9812c49627c8df92 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 1fd6917c8a5131254c740abad7f7228a47e3628c 
>   clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 84530f2b948f9abd74203db48707e490dd9c81a5 
>   clients/src/main/java/org/apache/kafka/clients/producer/Producer.java 17fe541588d462c68c33f6209717cc4015e9b62f 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java 4a2da41f47994f778109e3c4107ffd90195f0bae 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java ecfe2144d778a5d9b614df5278b9f0a15637f10b 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java dd0af8aee98abed5d4a0dc50989e37888bb353fe 
>   clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java d682bd46ec3826f0a72388cc4ec30e1b1223d0f3 
>   clients/src/test/java/org/apache/kafka/clients/producer/BufferPoolTest.java 4ae43ed47e31ad8052b4348a731da11120968508 
>   clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java 743aa7e523dd476949f484bfa4c7fb8a3afd7bf8 
>   clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java 75513b0bdd439329c5771d87436ef83fda853bfb 
>   clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java 83338633717cfa4ef7cf2a590b5aa6b9c8cb1dd2 
>   core/src/test/scala/integration/kafka/api/ProducerSendTest.scala b15237b76def3b234924280fa3fdb25dbb0cc0dc 
>   core/src/test/scala/unit/kafka/utils/TestUtils.scala 21d0ed2cb7c9459261d3cdc7c21dece5e2079698 
> 
> Diff: https://reviews.apache.org/r/30763/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Jay Kreps
> 
>


Re: Review Request 30763: KAFKA-1865: Producer Flush: if at first your patch doesn't succeed, try, try, try again.

Posted by Jay Kreps <bo...@gmail.com>.

> On Feb. 24, 2015, 5:20 p.m., Guozhang Wang wrote:
> > clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java, line 379
> > <https://reviews.apache.org/r/30763/diff/4/?file=873436#file873436line379>
> >
> >     Can also throw SchemaException.

Yeah I intentionally left this and IllegalArgumentException our of the docs since they basically indicate "programmer error" so you aren't expected to handle them, just crash and fix your code.


> On Feb. 24, 2015, 5:20 p.m., Guozhang Wang wrote:
> > clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java, line 91
> > <https://reviews.apache.org/r/30763/diff/4/?file=873442#file873442line91>
> >
> >     I still think it is kind of waste to create duplicate RecordMetadata here and in done()..

I take your point, but I tried the refactor to remove it and it wasn't very simple plus I think it would be quite rare that you would ever actually invoke both paths. Plus optimizing adds another synchronous memory reference while these super quick object allocations may get optimized out anyway...


- Jay


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30763/#review73829
-----------------------------------------------------------


On Feb. 24, 2015, 2:31 a.m., Jay Kreps wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/30763/
> -----------------------------------------------------------
> 
> (Updated Feb. 24, 2015, 2:31 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1865
>     https://issues.apache.org/jira/browse/KAFKA-1865
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> KAFKA-1865 Add a flush() method to the producer.
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/clients/Metadata.java e8afecda956303a6ee116499fd443a54c018e17d 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 1fd6917c8a5131254c740abad7f7228a47e3628c 
>   clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 84530f2b948f9abd74203db48707e490dd9c81a5 
>   clients/src/main/java/org/apache/kafka/clients/producer/Producer.java 17fe541588d462c68c33f6209717cc4015e9b62f 
>   clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java 4990692efa6f01c62e1d7b05fbf31bec50e398c9 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java 4a2da41f47994f778109e3c4107ffd90195f0bae 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java ecfe2144d778a5d9b614df5278b9f0a15637f10b 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java dd0af8aee98abed5d4a0dc50989e37888bb353fe 
>   clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java d682bd46ec3826f0a72388cc4ec30e1b1223d0f3 
>   clients/src/test/java/org/apache/kafka/clients/producer/BufferPoolTest.java 4ae43ed47e31ad8052b4348a731da11120968508 
>   clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java 743aa7e523dd476949f484bfa4c7fb8a3afd7bf8 
>   clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java 75513b0bdd439329c5771d87436ef83fda853bfb 
>   clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java 29c8417422c0cf0d29bf2405c77fd05e35350259 
>   clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java 83338633717cfa4ef7cf2a590b5aa6b9c8cb1dd2 
>   clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java 558942aaecd1b9f7098435d39aa4b362cd16ff0a 
>   core/src/test/scala/integration/kafka/api/ConsumerTest.scala 2802a399bf599e9530f53b7df72f12702a10d3c4 
>   core/src/test/scala/integration/kafka/api/ProducerSendTest.scala b15237b76def3b234924280fa3fdb25dbb0cc0dc 
>   core/src/test/scala/unit/kafka/utils/TestUtils.scala 21d0ed2cb7c9459261d3cdc7c21dece5e2079698 
> 
> Diff: https://reviews.apache.org/r/30763/diff/
> 
> 
> Testing
> -------
> 
> New patch addresses feedback. Also (1) comments out the consumer tests so I could verify everything else passes and (2) moves some unit tests I found that were in the wrong packages.
> 
> 
> Thanks,
> 
> Jay Kreps
> 
>


Re: Review Request 30763: KAFKA-1865: Producer Flush: Remove synchronization.

Posted by Jiangjie Qin <be...@gmail.com>.

> On Feb. 24, 2015, 5:20 p.m., Guozhang Wang wrote:
> > clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java, line 91
> > <https://reviews.apache.org/r/30763/diff/4/?file=873442#file873442line91>
> >
> >     I still think it is kind of waste to create duplicate RecordMetadata here and in done()..
> 
> Jay Kreps wrote:
>     I take your point, but I tried the refactor to remove it and it wasn't very simple plus I think it would be quite rare that you would ever actually invoke both paths. Plus optimizing adds another synchronous memory reference while these super quick object allocations may get optimized out anyway...

Actually in done() we don't create any RecordMetadata... It looks we were always creating a RecordMeatadata instance even before this patch. So FutureRecordMetatdata.valueOrError() will return a new RecordMeataData. The change here is that instead of creating the new RecordMetadata instance in valueOrError (which only work after done() is called), we create the RecordMeataData instance directly.
I think it is a little bit hacky from the code style point of view, i.e. duplicates code. But we are not really creating more RecordMeataData objects compared with previous solution.


- Jiangjie


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30763/#review73829
-----------------------------------------------------------


On Feb. 26, 2015, 1:16 a.m., Jay Kreps wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/30763/
> -----------------------------------------------------------
> 
> (Updated Feb. 26, 2015, 1:16 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1865
>     https://issues.apache.org/jira/browse/KAFKA-1865
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> KAFKA-1865 Add a flush() method to the producer.
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/clients/Metadata.java e8afecda956303a6ee116499fd443a54c018e17d 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 1fd6917c8a5131254c740abad7f7228a47e3628c 
>   clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 84530f2b948f9abd74203db48707e490dd9c81a5 
>   clients/src/main/java/org/apache/kafka/clients/producer/Producer.java 17fe541588d462c68c33f6209717cc4015e9b62f 
>   clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java 4990692efa6f01c62e1d7b05fbf31bec50e398c9 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java 4a2da41f47994f778109e3c4107ffd90195f0bae 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java ecfe2144d778a5d9b614df5278b9f0a15637f10b 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java dd0af8aee98abed5d4a0dc50989e37888bb353fe 
>   clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java d682bd46ec3826f0a72388cc4ec30e1b1223d0f3 
>   clients/src/test/java/org/apache/kafka/clients/producer/BufferPoolTest.java 4ae43ed47e31ad8052b4348a731da11120968508 
>   clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java 743aa7e523dd476949f484bfa4c7fb8a3afd7bf8 
>   clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java 75513b0bdd439329c5771d87436ef83fda853bfb 
>   clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java 404bedb3dc4e44cc79251d71e1e3f8efdab60efa 
>   clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java 83338633717cfa4ef7cf2a590b5aa6b9c8cb1dd2 
>   clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java 558942aaecd1b9f7098435d39aa4b362cd16ff0a 
>   core/src/test/scala/integration/kafka/api/ConsumerTest.scala 2802a399bf599e9530f53b7df72f12702a10d3c4 
>   core/src/test/scala/integration/kafka/api/ProducerSendTest.scala b15237b76def3b234924280fa3fdb25dbb0cc0dc 
>   core/src/test/scala/unit/kafka/utils/TestUtils.scala 21d0ed2cb7c9459261d3cdc7c21dece5e2079698 
> 
> Diff: https://reviews.apache.org/r/30763/diff/
> 
> 
> Testing
> -------
> 
> The latest patch uses Jiangjie's suggestion to remove the synchronization on flush.
> 
> 
> Thanks,
> 
> Jay Kreps
> 
>


Re: Review Request 30763: KAFKA-1865: add flush to producer

Posted by Jay Kreps <bo...@gmail.com>.

> On Feb. 24, 2015, 5:20 p.m., Guozhang Wang wrote:
> > clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java, line 91
> > <https://reviews.apache.org/r/30763/diff/4/?file=873442#file873442line91>
> >
> >     I still think it is kind of waste to create duplicate RecordMetadata here and in done()..
> 
> Jay Kreps wrote:
>     I take your point, but I tried the refactor to remove it and it wasn't very simple plus I think it would be quite rare that you would ever actually invoke both paths. Plus optimizing adds another synchronous memory reference while these super quick object allocations may get optimized out anyway...
> 
> Jiangjie Qin wrote:
>     Actually in done() we don't create any RecordMetadata... It looks we were always creating a RecordMeatadata instance even before this patch. So FutureRecordMetatdata.valueOrError() will return a new RecordMeataData. The change here is that instead of creating the new RecordMetadata instance in valueOrError (which only work after done() is called), we create the RecordMeataData instance directly.
>     I think it is a little bit hacky from the code style point of view, i.e. duplicates code. But we are not really creating more RecordMeataData objects compared with previous solution.

Yes, that is what I am trying to say. I agree that two places that invoke the constructor is not ideal, but I don't see a fix that is actually better. I tried--did a full refactoring as we discussed but I don't think it was actually either simpler or more efficient.


- Jay


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30763/#review73829
-----------------------------------------------------------


On Feb. 26, 2015, 6:37 p.m., Jay Kreps wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/30763/
> -----------------------------------------------------------
> 
> (Updated Feb. 26, 2015, 6:37 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1865
>     https://issues.apache.org/jira/browse/KAFKA-1865
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> KAFKA-1865 Add a flush() method to the producer.
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/clients/Metadata.java e8afecda956303a6ee116499fd443a54c018e17d 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 1fd6917c8a5131254c740abad7f7228a47e3628c 
>   clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 84530f2b948f9abd74203db48707e490dd9c81a5 
>   clients/src/main/java/org/apache/kafka/clients/producer/Producer.java 17fe541588d462c68c33f6209717cc4015e9b62f 
>   clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java 4990692efa6f01c62e1d7b05fbf31bec50e398c9 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java 4a2da41f47994f778109e3c4107ffd90195f0bae 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java ecfe2144d778a5d9b614df5278b9f0a15637f10b 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java dd0af8aee98abed5d4a0dc50989e37888bb353fe 
>   clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java d682bd46ec3826f0a72388cc4ec30e1b1223d0f3 
>   clients/src/test/java/org/apache/kafka/clients/producer/BufferPoolTest.java 4ae43ed47e31ad8052b4348a731da11120968508 
>   clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java 743aa7e523dd476949f484bfa4c7fb8a3afd7bf8 
>   clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java 75513b0bdd439329c5771d87436ef83fda853bfb 
>   clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java 404bedb3dc4e44cc79251d71e1e3f8efdab60efa 
>   clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java 83338633717cfa4ef7cf2a590b5aa6b9c8cb1dd2 
>   clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java 558942aaecd1b9f7098435d39aa4b362cd16ff0a 
>   core/src/test/scala/integration/kafka/api/ConsumerTest.scala 2802a399bf599e9530f53b7df72f12702a10d3c4 
>   core/src/test/scala/integration/kafka/api/ProducerSendTest.scala b15237b76def3b234924280fa3fdb25dbb0cc0dc 
>   core/src/test/scala/unit/kafka/utils/TestUtils.scala 21d0ed2cb7c9459261d3cdc7c21dece5e2079698 
> 
> Diff: https://reviews.apache.org/r/30763/diff/
> 
> 
> Testing
> -------
> 
> The latest patch uses Jiangjie's suggestion to remove the synchronization on flush.
> 
> 
> Thanks,
> 
> Jay Kreps
> 
>


Re: Review Request 30763: KAFKA-1865: Producer Flush: if at first your patch doesn't succeed, try, try, try again.

Posted by Guozhang Wang <wa...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30763/#review73829
-----------------------------------------------------------



clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
<https://reviews.apache.org/r/30763/#comment120232>

    Can also throw SchemaException.



clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
<https://reviews.apache.org/r/30763/#comment120233>

    I still think it is kind of waste to create duplicate RecordMetadata here and in done()..


- Guozhang Wang


On Feb. 24, 2015, 2:31 a.m., Jay Kreps wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/30763/
> -----------------------------------------------------------
> 
> (Updated Feb. 24, 2015, 2:31 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1865
>     https://issues.apache.org/jira/browse/KAFKA-1865
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> KAFKA-1865 Add a flush() method to the producer.
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/clients/Metadata.java e8afecda956303a6ee116499fd443a54c018e17d 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 1fd6917c8a5131254c740abad7f7228a47e3628c 
>   clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 84530f2b948f9abd74203db48707e490dd9c81a5 
>   clients/src/main/java/org/apache/kafka/clients/producer/Producer.java 17fe541588d462c68c33f6209717cc4015e9b62f 
>   clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java 4990692efa6f01c62e1d7b05fbf31bec50e398c9 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java 4a2da41f47994f778109e3c4107ffd90195f0bae 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java ecfe2144d778a5d9b614df5278b9f0a15637f10b 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java dd0af8aee98abed5d4a0dc50989e37888bb353fe 
>   clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java d682bd46ec3826f0a72388cc4ec30e1b1223d0f3 
>   clients/src/test/java/org/apache/kafka/clients/producer/BufferPoolTest.java 4ae43ed47e31ad8052b4348a731da11120968508 
>   clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java 743aa7e523dd476949f484bfa4c7fb8a3afd7bf8 
>   clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java 75513b0bdd439329c5771d87436ef83fda853bfb 
>   clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java 29c8417422c0cf0d29bf2405c77fd05e35350259 
>   clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java 83338633717cfa4ef7cf2a590b5aa6b9c8cb1dd2 
>   clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java 558942aaecd1b9f7098435d39aa4b362cd16ff0a 
>   core/src/test/scala/integration/kafka/api/ConsumerTest.scala 2802a399bf599e9530f53b7df72f12702a10d3c4 
>   core/src/test/scala/integration/kafka/api/ProducerSendTest.scala b15237b76def3b234924280fa3fdb25dbb0cc0dc 
>   core/src/test/scala/unit/kafka/utils/TestUtils.scala 21d0ed2cb7c9459261d3cdc7c21dece5e2079698 
> 
> Diff: https://reviews.apache.org/r/30763/diff/
> 
> 
> Testing
> -------
> 
> New patch addresses feedback. Also (1) comments out the consumer tests so I could verify everything else passes and (2) moves some unit tests I found that were in the wrong packages.
> 
> 
> Thanks,
> 
> Jay Kreps
> 
>


Re: Review Request 30763: KAFKA-1865: Producer Flush: if at first your patch doesn't succeed, try, try, try again.

Posted by Jay Kreps <bo...@gmail.com>.
I handle this by synchronizing flush()--only one thread can be flushing at
a time. This isn't too much of a drawback since the first flush will drain
everything anyway the second flush likely won't do too much, so sequencing
them shouldn't hurt too much.

However your idea of using a counter may actually be better and could
possibly remove the synchronization entirely. Let's both think that through
and see if we can think of any corner cases. If not I'll change to that.

-Jay

On Mon, Feb 23, 2015 at 8:46 PM, Jiangjie Qin <be...@gmail.com> wrote:

>
>
> > On Feb. 24, 2015, 4:22 a.m., Jiangjie Qin wrote:
> > > LGTM. Thanks, Jay.
> > > I actually tried just putting a synchronized block around the line
> where we copy the imcomplete set and it seems worked. Maybe we can do that
> if you prefer less code.
> >
> > Jay Kreps wrote:
> >     I think that depends on the lock the add/remove uses in the
> internals of Collections.syncronizedSet which could vary by JVM and
> version. I also think that whenever possible ad hoc synchronization should
> be encapsulated in a small class rather than sprinkled here and there in a
> larger class just so it is easy to verify correctness, even when that is
> slightly more code.
>
> Makes sense. It just occurred to me that current approach might causing a
> flush() wait up to linger.ms.
>
> Imagine there are two threads and with the following sequence:
> 1. thread 1 call flush
> 2. accumulator.flushing = true
> 3. sender thread woke up and did one drain.
> 4. thread 1 started wating on callback 1
> 5. thread 2 call send and followed by a flush
> 6. sender thread finished callback 1 and thread 1 set flushing to false.
> 7. sender thread will not be able to continue to honor the flush for
> thread 2 because flushing flag has been turned off.
> The message sent by thread 2 in step 5 will sitting in accumulator for
> linger.ms and thread 2 will be blocked.
>
> I think we can make the flushing to be an atomic interger instead of
> boolean, so each thread just increment it when begins flush and decrement
> it after flush finishes. As long as flushing > 0 the accumulator should
> flush the data.
>
>
> - Jiangjie
>
>
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/30763/#review73749
> -----------------------------------------------------------
>
>
> On Feb. 24, 2015, 2:31 a.m., Jay Kreps wrote:
> >
> > -----------------------------------------------------------
> > This is an automatically generated e-mail. To reply, visit:
> > https://reviews.apache.org/r/30763/
> > -----------------------------------------------------------
> >
> > (Updated Feb. 24, 2015, 2:31 a.m.)
> >
> >
> > Review request for kafka.
> >
> >
> > Bugs: KAFKA-1865
> >     https://issues.apache.org/jira/browse/KAFKA-1865
> >
> >
> > Repository: kafka
> >
> >
> > Description
> > -------
> >
> > KAFKA-1865 Add a flush() method to the producer.
> >
> >
> > Diffs
> > -----
> >
> >   clients/src/main/java/org/apache/kafka/clients/Metadata.java
> e8afecda956303a6ee116499fd443a54c018e17d
> >
>  clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
> 1fd6917c8a5131254c740abad7f7228a47e3628c
> >
>  clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
> 84530f2b948f9abd74203db48707e490dd9c81a5
> >   clients/src/main/java/org/apache/kafka/clients/producer/Producer.java
> 17fe541588d462c68c33f6209717cc4015e9b62f
> >
>  clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java
> 4990692efa6f01c62e1d7b05fbf31bec50e398c9
> >
>  clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java
> 4a2da41f47994f778109e3c4107ffd90195f0bae
> >
>  clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
> ecfe2144d778a5d9b614df5278b9f0a15637f10b
> >
>  clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
> dd0af8aee98abed5d4a0dc50989e37888bb353fe
> >
>  clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java
> PRE-CREATION
> >   clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java
> d682bd46ec3826f0a72388cc4ec30e1b1223d0f3
> >
>  clients/src/test/java/org/apache/kafka/clients/producer/BufferPoolTest.java
> 4ae43ed47e31ad8052b4348a731da11120968508
> >
>  clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java
> 743aa7e523dd476949f484bfa4c7fb8a3afd7bf8
> >
>  clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
> 75513b0bdd439329c5771d87436ef83fda853bfb
> >
>  clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java
> 29c8417422c0cf0d29bf2405c77fd05e35350259
> >
>  clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java
> 83338633717cfa4ef7cf2a590b5aa6b9c8cb1dd2
> >
>  clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java
> 558942aaecd1b9f7098435d39aa4b362cd16ff0a
> >   core/src/test/scala/integration/kafka/api/ConsumerTest.scala
> 2802a399bf599e9530f53b7df72f12702a10d3c4
> >   core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
> b15237b76def3b234924280fa3fdb25dbb0cc0dc
> >   core/src/test/scala/unit/kafka/utils/TestUtils.scala
> 21d0ed2cb7c9459261d3cdc7c21dece5e2079698
> >
> > Diff: https://reviews.apache.org/r/30763/diff/
> >
> >
> > Testing
> > -------
> >
> > New patch addresses feedback. Also (1) comments out the consumer tests
> so I could verify everything else passes and (2) moves some unit tests I
> found that were in the wrong packages.
> >
> >
> > Thanks,
> >
> > Jay Kreps
> >
> >
>
>

Re: Review Request 30763: KAFKA-1865: Producer Flush: if at first your patch doesn't succeed, try, try, try again.

Posted by Jiangjie Qin <be...@gmail.com>.

> On Feb. 24, 2015, 4:22 a.m., Jiangjie Qin wrote:
> > LGTM. Thanks, Jay.
> > I actually tried just putting a synchronized block around the line where we copy the imcomplete set and it seems worked. Maybe we can do that if you prefer less code.
> 
> Jay Kreps wrote:
>     I think that depends on the lock the add/remove uses in the internals of Collections.syncronizedSet which could vary by JVM and version. I also think that whenever possible ad hoc synchronization should be encapsulated in a small class rather than sprinkled here and there in a larger class just so it is easy to verify correctness, even when that is slightly more code.

Makes sense. It just occurred to me that current approach might causing a flush() wait up to linger.ms.

Imagine there are two threads and with the following sequence:
1. thread 1 call flush
2. accumulator.flushing = true
3. sender thread woke up and did one drain.
4. thread 1 started wating on callback 1
5. thread 2 call send and followed by a flush
6. sender thread finished callback 1 and thread 1 set flushing to false.
7. sender thread will not be able to continue to honor the flush for thread 2 because flushing flag has been turned off.
The message sent by thread 2 in step 5 will sitting in accumulator for linger.ms and thread 2 will be blocked.

I think we can make the flushing to be an atomic interger instead of boolean, so each thread just increment it when begins flush and decrement it after flush finishes. As long as flushing > 0 the accumulator should flush the data.


- Jiangjie


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30763/#review73749
-----------------------------------------------------------


On Feb. 24, 2015, 2:31 a.m., Jay Kreps wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/30763/
> -----------------------------------------------------------
> 
> (Updated Feb. 24, 2015, 2:31 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1865
>     https://issues.apache.org/jira/browse/KAFKA-1865
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> KAFKA-1865 Add a flush() method to the producer.
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/clients/Metadata.java e8afecda956303a6ee116499fd443a54c018e17d 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 1fd6917c8a5131254c740abad7f7228a47e3628c 
>   clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 84530f2b948f9abd74203db48707e490dd9c81a5 
>   clients/src/main/java/org/apache/kafka/clients/producer/Producer.java 17fe541588d462c68c33f6209717cc4015e9b62f 
>   clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java 4990692efa6f01c62e1d7b05fbf31bec50e398c9 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java 4a2da41f47994f778109e3c4107ffd90195f0bae 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java ecfe2144d778a5d9b614df5278b9f0a15637f10b 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java dd0af8aee98abed5d4a0dc50989e37888bb353fe 
>   clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java d682bd46ec3826f0a72388cc4ec30e1b1223d0f3 
>   clients/src/test/java/org/apache/kafka/clients/producer/BufferPoolTest.java 4ae43ed47e31ad8052b4348a731da11120968508 
>   clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java 743aa7e523dd476949f484bfa4c7fb8a3afd7bf8 
>   clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java 75513b0bdd439329c5771d87436ef83fda853bfb 
>   clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java 29c8417422c0cf0d29bf2405c77fd05e35350259 
>   clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java 83338633717cfa4ef7cf2a590b5aa6b9c8cb1dd2 
>   clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java 558942aaecd1b9f7098435d39aa4b362cd16ff0a 
>   core/src/test/scala/integration/kafka/api/ConsumerTest.scala 2802a399bf599e9530f53b7df72f12702a10d3c4 
>   core/src/test/scala/integration/kafka/api/ProducerSendTest.scala b15237b76def3b234924280fa3fdb25dbb0cc0dc 
>   core/src/test/scala/unit/kafka/utils/TestUtils.scala 21d0ed2cb7c9459261d3cdc7c21dece5e2079698 
> 
> Diff: https://reviews.apache.org/r/30763/diff/
> 
> 
> Testing
> -------
> 
> New patch addresses feedback. Also (1) comments out the consumer tests so I could verify everything else passes and (2) moves some unit tests I found that were in the wrong packages.
> 
> 
> Thanks,
> 
> Jay Kreps
> 
>


Re: Review Request 30763: KAFKA-1865: Producer Flush: if at first your patch doesn't succeed, try, try, try again.

Posted by Jay Kreps <bo...@gmail.com>.

> On Feb. 24, 2015, 4:22 a.m., Jiangjie Qin wrote:
> > LGTM. Thanks, Jay.
> > I actually tried just putting a synchronized block around the line where we copy the imcomplete set and it seems worked. Maybe we can do that if you prefer less code.

I think that depends on the lock the add/remove uses in the internals of Collections.syncronizedSet which could vary by JVM and version. I also think that whenever possible ad hoc synchronization should be encapsulated in a small class rather than sprinkled here and there in a larger class just so it is easy to verify correctness, even when that is slightly more code.


- Jay


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30763/#review73749
-----------------------------------------------------------


On Feb. 24, 2015, 2:31 a.m., Jay Kreps wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/30763/
> -----------------------------------------------------------
> 
> (Updated Feb. 24, 2015, 2:31 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1865
>     https://issues.apache.org/jira/browse/KAFKA-1865
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> KAFKA-1865 Add a flush() method to the producer.
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/clients/Metadata.java e8afecda956303a6ee116499fd443a54c018e17d 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 1fd6917c8a5131254c740abad7f7228a47e3628c 
>   clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 84530f2b948f9abd74203db48707e490dd9c81a5 
>   clients/src/main/java/org/apache/kafka/clients/producer/Producer.java 17fe541588d462c68c33f6209717cc4015e9b62f 
>   clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java 4990692efa6f01c62e1d7b05fbf31bec50e398c9 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java 4a2da41f47994f778109e3c4107ffd90195f0bae 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java ecfe2144d778a5d9b614df5278b9f0a15637f10b 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java dd0af8aee98abed5d4a0dc50989e37888bb353fe 
>   clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java d682bd46ec3826f0a72388cc4ec30e1b1223d0f3 
>   clients/src/test/java/org/apache/kafka/clients/producer/BufferPoolTest.java 4ae43ed47e31ad8052b4348a731da11120968508 
>   clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java 743aa7e523dd476949f484bfa4c7fb8a3afd7bf8 
>   clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java 75513b0bdd439329c5771d87436ef83fda853bfb 
>   clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java 29c8417422c0cf0d29bf2405c77fd05e35350259 
>   clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java 83338633717cfa4ef7cf2a590b5aa6b9c8cb1dd2 
>   clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java 558942aaecd1b9f7098435d39aa4b362cd16ff0a 
>   core/src/test/scala/integration/kafka/api/ConsumerTest.scala 2802a399bf599e9530f53b7df72f12702a10d3c4 
>   core/src/test/scala/integration/kafka/api/ProducerSendTest.scala b15237b76def3b234924280fa3fdb25dbb0cc0dc 
>   core/src/test/scala/unit/kafka/utils/TestUtils.scala 21d0ed2cb7c9459261d3cdc7c21dece5e2079698 
> 
> Diff: https://reviews.apache.org/r/30763/diff/
> 
> 
> Testing
> -------
> 
> New patch addresses feedback. Also (1) comments out the consumer tests so I could verify everything else passes and (2) moves some unit tests I found that were in the wrong packages.
> 
> 
> Thanks,
> 
> Jay Kreps
> 
>


Re: Review Request 30763: KAFKA-1865: Producer Flush: if at first your patch doesn't succeed, try, try, try again.

Posted by Jiangjie Qin <be...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30763/#review73749
-----------------------------------------------------------

Ship it!


LGTM. Thanks, Jay.
I actually tried just putting a synchronized block around the line where we copy the imcomplete set and it seems worked. Maybe we can do that if you prefer less code.

- Jiangjie Qin


On Feb. 24, 2015, 2:31 a.m., Jay Kreps wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/30763/
> -----------------------------------------------------------
> 
> (Updated Feb. 24, 2015, 2:31 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1865
>     https://issues.apache.org/jira/browse/KAFKA-1865
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> KAFKA-1865 Add a flush() method to the producer.
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/clients/Metadata.java e8afecda956303a6ee116499fd443a54c018e17d 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 1fd6917c8a5131254c740abad7f7228a47e3628c 
>   clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 84530f2b948f9abd74203db48707e490dd9c81a5 
>   clients/src/main/java/org/apache/kafka/clients/producer/Producer.java 17fe541588d462c68c33f6209717cc4015e9b62f 
>   clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java 4990692efa6f01c62e1d7b05fbf31bec50e398c9 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java 4a2da41f47994f778109e3c4107ffd90195f0bae 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java ecfe2144d778a5d9b614df5278b9f0a15637f10b 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java dd0af8aee98abed5d4a0dc50989e37888bb353fe 
>   clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java d682bd46ec3826f0a72388cc4ec30e1b1223d0f3 
>   clients/src/test/java/org/apache/kafka/clients/producer/BufferPoolTest.java 4ae43ed47e31ad8052b4348a731da11120968508 
>   clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java 743aa7e523dd476949f484bfa4c7fb8a3afd7bf8 
>   clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java 75513b0bdd439329c5771d87436ef83fda853bfb 
>   clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java 29c8417422c0cf0d29bf2405c77fd05e35350259 
>   clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java 83338633717cfa4ef7cf2a590b5aa6b9c8cb1dd2 
>   clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java 558942aaecd1b9f7098435d39aa4b362cd16ff0a 
>   core/src/test/scala/integration/kafka/api/ConsumerTest.scala 2802a399bf599e9530f53b7df72f12702a10d3c4 
>   core/src/test/scala/integration/kafka/api/ProducerSendTest.scala b15237b76def3b234924280fa3fdb25dbb0cc0dc 
>   core/src/test/scala/unit/kafka/utils/TestUtils.scala 21d0ed2cb7c9459261d3cdc7c21dece5e2079698 
> 
> Diff: https://reviews.apache.org/r/30763/diff/
> 
> 
> Testing
> -------
> 
> New patch addresses feedback. Also (1) comments out the consumer tests so I could verify everything else passes and (2) moves some unit tests I found that were in the wrong packages.
> 
> 
> Thanks,
> 
> Jay Kreps
> 
>


Re: Review Request 30763: KAFKA-1865: add flush to producer

Posted by Jay Kreps <bo...@gmail.com>.

> On Feb. 26, 2015, 7:15 p.m., Jun Rao wrote:
> > clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java, line 87
> > <https://reviews.apache.org/r/30763/diff/6/?file=878560#file878560line87>
> >
> >     Do we really want to print the stack trace?

I think so, we are not expecting an exception so we want to know if there is one rather than swallowing it, right?


- Jay


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30763/#review74332
-----------------------------------------------------------


On Feb. 26, 2015, 6:40 p.m., Jay Kreps wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/30763/
> -----------------------------------------------------------
> 
> (Updated Feb. 26, 2015, 6:40 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1865
>     https://issues.apache.org/jira/browse/KAFKA-1865
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> KAFKA-1865 Add a flush() method to the producer.
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/clients/Metadata.java e8afecda956303a6ee116499fd443a54c018e17d 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 1fd6917c8a5131254c740abad7f7228a47e3628c 
>   clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 84530f2b948f9abd74203db48707e490dd9c81a5 
>   clients/src/main/java/org/apache/kafka/clients/producer/Producer.java 17fe541588d462c68c33f6209717cc4015e9b62f 
>   clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java 4990692efa6f01c62e1d7b05fbf31bec50e398c9 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java 4a2da41f47994f778109e3c4107ffd90195f0bae 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java ecfe2144d778a5d9b614df5278b9f0a15637f10b 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java dd0af8aee98abed5d4a0dc50989e37888bb353fe 
>   clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java d682bd46ec3826f0a72388cc4ec30e1b1223d0f3 
>   clients/src/test/java/org/apache/kafka/clients/producer/BufferPoolTest.java 4ae43ed47e31ad8052b4348a731da11120968508 
>   clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java 743aa7e523dd476949f484bfa4c7fb8a3afd7bf8 
>   clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java 75513b0bdd439329c5771d87436ef83fda853bfb 
>   clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java 404bedb3dc4e44cc79251d71e1e3f8efdab60efa 
>   clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java 83338633717cfa4ef7cf2a590b5aa6b9c8cb1dd2 
>   clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java 558942aaecd1b9f7098435d39aa4b362cd16ff0a 
>   core/src/test/scala/integration/kafka/api/ConsumerTest.scala 2802a399bf599e9530f53b7df72f12702a10d3c4 
>   core/src/test/scala/integration/kafka/api/ProducerSendTest.scala b15237b76def3b234924280fa3fdb25dbb0cc0dc 
>   core/src/test/scala/unit/kafka/utils/TestUtils.scala 21d0ed2cb7c9459261d3cdc7c21dece5e2079698 
> 
> Diff: https://reviews.apache.org/r/30763/diff/
> 
> 
> Testing
> -------
> 
> The latest patch uses Jiangjie's suggestion to remove the synchronization on flush.
> 
> Fixed the boolean logic in the unit test as per Jun's suggestion.
> 
> 
> Thanks,
> 
> Jay Kreps
> 
>


Re: Review Request 30763: KAFKA-1865: add flush to producer

Posted by Jun Rao <ju...@gmail.com>.

> On Feb. 26, 2015, 7:15 p.m., Jun Rao wrote:
> > clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java, line 87
> > <https://reviews.apache.org/r/30763/diff/6/?file=878560#file878560line87>
> >
> >     Do we really want to print the stack trace?
> 
> Jay Kreps wrote:
>     I think so, we are not expecting an exception so we want to know if there is one rather than swallowing it, right?

If the test passes, people may not look at the output of the unit test. Perhaps we can set some state in this case and assert on it in the unit test thread?


- Jun


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30763/#review74332
-----------------------------------------------------------


On Feb. 26, 2015, 6:40 p.m., Jay Kreps wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/30763/
> -----------------------------------------------------------
> 
> (Updated Feb. 26, 2015, 6:40 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1865
>     https://issues.apache.org/jira/browse/KAFKA-1865
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> KAFKA-1865 Add a flush() method to the producer.
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/clients/Metadata.java e8afecda956303a6ee116499fd443a54c018e17d 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 1fd6917c8a5131254c740abad7f7228a47e3628c 
>   clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 84530f2b948f9abd74203db48707e490dd9c81a5 
>   clients/src/main/java/org/apache/kafka/clients/producer/Producer.java 17fe541588d462c68c33f6209717cc4015e9b62f 
>   clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java 4990692efa6f01c62e1d7b05fbf31bec50e398c9 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java 4a2da41f47994f778109e3c4107ffd90195f0bae 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java ecfe2144d778a5d9b614df5278b9f0a15637f10b 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java dd0af8aee98abed5d4a0dc50989e37888bb353fe 
>   clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java d682bd46ec3826f0a72388cc4ec30e1b1223d0f3 
>   clients/src/test/java/org/apache/kafka/clients/producer/BufferPoolTest.java 4ae43ed47e31ad8052b4348a731da11120968508 
>   clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java 743aa7e523dd476949f484bfa4c7fb8a3afd7bf8 
>   clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java 75513b0bdd439329c5771d87436ef83fda853bfb 
>   clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java 404bedb3dc4e44cc79251d71e1e3f8efdab60efa 
>   clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java 83338633717cfa4ef7cf2a590b5aa6b9c8cb1dd2 
>   clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java 558942aaecd1b9f7098435d39aa4b362cd16ff0a 
>   core/src/test/scala/integration/kafka/api/ConsumerTest.scala 2802a399bf599e9530f53b7df72f12702a10d3c4 
>   core/src/test/scala/integration/kafka/api/ProducerSendTest.scala b15237b76def3b234924280fa3fdb25dbb0cc0dc 
>   core/src/test/scala/unit/kafka/utils/TestUtils.scala 21d0ed2cb7c9459261d3cdc7c21dece5e2079698 
> 
> Diff: https://reviews.apache.org/r/30763/diff/
> 
> 
> Testing
> -------
> 
> The latest patch uses Jiangjie's suggestion to remove the synchronization on flush.
> 
> Fixed the boolean logic in the unit test as per Jun's suggestion.
> 
> 
> Thanks,
> 
> Jay Kreps
> 
>


Re: Review Request 30763: KAFKA-1865: add flush to producer

Posted by Jay Kreps <bo...@gmail.com>.

> On Feb. 26, 2015, 7:15 p.m., Jun Rao wrote:
> > clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java, line 87
> > <https://reviews.apache.org/r/30763/diff/6/?file=878560#file878560line87>
> >
> >     Do we really want to print the stack trace?
> 
> Jay Kreps wrote:
>     I think so, we are not expecting an exception so we want to know if there is one rather than swallowing it, right?
> 
> Jun Rao wrote:
>     If the test passes, people may not look at the output of the unit test. Perhaps we can set some state in this case and assert on it in the unit test thread?

Cool, I'll do that.


- Jay


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30763/#review74332
-----------------------------------------------------------


On Feb. 26, 2015, 6:40 p.m., Jay Kreps wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/30763/
> -----------------------------------------------------------
> 
> (Updated Feb. 26, 2015, 6:40 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1865
>     https://issues.apache.org/jira/browse/KAFKA-1865
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> KAFKA-1865 Add a flush() method to the producer.
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/clients/Metadata.java e8afecda956303a6ee116499fd443a54c018e17d 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 1fd6917c8a5131254c740abad7f7228a47e3628c 
>   clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 84530f2b948f9abd74203db48707e490dd9c81a5 
>   clients/src/main/java/org/apache/kafka/clients/producer/Producer.java 17fe541588d462c68c33f6209717cc4015e9b62f 
>   clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java 4990692efa6f01c62e1d7b05fbf31bec50e398c9 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java 4a2da41f47994f778109e3c4107ffd90195f0bae 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java ecfe2144d778a5d9b614df5278b9f0a15637f10b 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java dd0af8aee98abed5d4a0dc50989e37888bb353fe 
>   clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java d682bd46ec3826f0a72388cc4ec30e1b1223d0f3 
>   clients/src/test/java/org/apache/kafka/clients/producer/BufferPoolTest.java 4ae43ed47e31ad8052b4348a731da11120968508 
>   clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java 743aa7e523dd476949f484bfa4c7fb8a3afd7bf8 
>   clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java 75513b0bdd439329c5771d87436ef83fda853bfb 
>   clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java 404bedb3dc4e44cc79251d71e1e3f8efdab60efa 
>   clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java 83338633717cfa4ef7cf2a590b5aa6b9c8cb1dd2 
>   clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java 558942aaecd1b9f7098435d39aa4b362cd16ff0a 
>   core/src/test/scala/integration/kafka/api/ConsumerTest.scala 2802a399bf599e9530f53b7df72f12702a10d3c4 
>   core/src/test/scala/integration/kafka/api/ProducerSendTest.scala b15237b76def3b234924280fa3fdb25dbb0cc0dc 
>   core/src/test/scala/unit/kafka/utils/TestUtils.scala 21d0ed2cb7c9459261d3cdc7c21dece5e2079698 
> 
> Diff: https://reviews.apache.org/r/30763/diff/
> 
> 
> Testing
> -------
> 
> The latest patch uses Jiangjie's suggestion to remove the synchronization on flush.
> 
> Fixed the boolean logic in the unit test as per Jun's suggestion.
> 
> 
> Thanks,
> 
> Jay Kreps
> 
>


Re: Review Request 30763: KAFKA-1865: add flush to producer

Posted by Jun Rao <ju...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30763/#review74332
-----------------------------------------------------------

Ship it!


Looks good to me. Just a couple of minor comments.


clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
<https://reviews.apache.org/r/30763/#comment120899>

    We should remove this comment, right?



clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java
<https://reviews.apache.org/r/30763/#comment120901>

    Do we really want to print the stack trace?


- Jun Rao


On Feb. 26, 2015, 6:40 p.m., Jay Kreps wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/30763/
> -----------------------------------------------------------
> 
> (Updated Feb. 26, 2015, 6:40 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1865
>     https://issues.apache.org/jira/browse/KAFKA-1865
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> KAFKA-1865 Add a flush() method to the producer.
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/clients/Metadata.java e8afecda956303a6ee116499fd443a54c018e17d 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 1fd6917c8a5131254c740abad7f7228a47e3628c 
>   clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 84530f2b948f9abd74203db48707e490dd9c81a5 
>   clients/src/main/java/org/apache/kafka/clients/producer/Producer.java 17fe541588d462c68c33f6209717cc4015e9b62f 
>   clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java 4990692efa6f01c62e1d7b05fbf31bec50e398c9 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java 4a2da41f47994f778109e3c4107ffd90195f0bae 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java ecfe2144d778a5d9b614df5278b9f0a15637f10b 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java dd0af8aee98abed5d4a0dc50989e37888bb353fe 
>   clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java d682bd46ec3826f0a72388cc4ec30e1b1223d0f3 
>   clients/src/test/java/org/apache/kafka/clients/producer/BufferPoolTest.java 4ae43ed47e31ad8052b4348a731da11120968508 
>   clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java 743aa7e523dd476949f484bfa4c7fb8a3afd7bf8 
>   clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java 75513b0bdd439329c5771d87436ef83fda853bfb 
>   clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java 404bedb3dc4e44cc79251d71e1e3f8efdab60efa 
>   clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java 83338633717cfa4ef7cf2a590b5aa6b9c8cb1dd2 
>   clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java 558942aaecd1b9f7098435d39aa4b362cd16ff0a 
>   core/src/test/scala/integration/kafka/api/ConsumerTest.scala 2802a399bf599e9530f53b7df72f12702a10d3c4 
>   core/src/test/scala/integration/kafka/api/ProducerSendTest.scala b15237b76def3b234924280fa3fdb25dbb0cc0dc 
>   core/src/test/scala/unit/kafka/utils/TestUtils.scala 21d0ed2cb7c9459261d3cdc7c21dece5e2079698 
> 
> Diff: https://reviews.apache.org/r/30763/diff/
> 
> 
> Testing
> -------
> 
> The latest patch uses Jiangjie's suggestion to remove the synchronization on flush.
> 
> Fixed the boolean logic in the unit test as per Jun's suggestion.
> 
> 
> Thanks,
> 
> Jay Kreps
> 
>


Re: Review Request 30763: KAFKA-1865: add flush to producer

Posted by Jay Kreps <bo...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30763/
-----------------------------------------------------------

(Updated Feb. 26, 2015, 6:40 p.m.)


Review request for kafka.


Bugs: KAFKA-1865
    https://issues.apache.org/jira/browse/KAFKA-1865


Repository: kafka


Description
-------

KAFKA-1865 Add a flush() method to the producer.


Diffs
-----

  clients/src/main/java/org/apache/kafka/clients/Metadata.java e8afecda956303a6ee116499fd443a54c018e17d 
  clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 1fd6917c8a5131254c740abad7f7228a47e3628c 
  clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 84530f2b948f9abd74203db48707e490dd9c81a5 
  clients/src/main/java/org/apache/kafka/clients/producer/Producer.java 17fe541588d462c68c33f6209717cc4015e9b62f 
  clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java 4990692efa6f01c62e1d7b05fbf31bec50e398c9 
  clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java 4a2da41f47994f778109e3c4107ffd90195f0bae 
  clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java ecfe2144d778a5d9b614df5278b9f0a15637f10b 
  clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java dd0af8aee98abed5d4a0dc50989e37888bb353fe 
  clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java PRE-CREATION 
  clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java d682bd46ec3826f0a72388cc4ec30e1b1223d0f3 
  clients/src/test/java/org/apache/kafka/clients/producer/BufferPoolTest.java 4ae43ed47e31ad8052b4348a731da11120968508 
  clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java 743aa7e523dd476949f484bfa4c7fb8a3afd7bf8 
  clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java 75513b0bdd439329c5771d87436ef83fda853bfb 
  clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java 404bedb3dc4e44cc79251d71e1e3f8efdab60efa 
  clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java 83338633717cfa4ef7cf2a590b5aa6b9c8cb1dd2 
  clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java 558942aaecd1b9f7098435d39aa4b362cd16ff0a 
  core/src/test/scala/integration/kafka/api/ConsumerTest.scala 2802a399bf599e9530f53b7df72f12702a10d3c4 
  core/src/test/scala/integration/kafka/api/ProducerSendTest.scala b15237b76def3b234924280fa3fdb25dbb0cc0dc 
  core/src/test/scala/unit/kafka/utils/TestUtils.scala 21d0ed2cb7c9459261d3cdc7c21dece5e2079698 

Diff: https://reviews.apache.org/r/30763/diff/


Testing (updated)
-------

The latest patch uses Jiangjie's suggestion to remove the synchronization on flush.

Fixed the boolean logic in the unit test as per Jun's suggestion.


Thanks,

Jay Kreps


Re: Review Request 30763: KAFKA-1865: add flush to producer

Posted by Jay Kreps <bo...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30763/
-----------------------------------------------------------

(Updated Feb. 26, 2015, 6:37 p.m.)


Review request for kafka.


Summary (updated)
-----------------

KAFKA-1865: add flush to producer


Bugs: KAFKA-1865
    https://issues.apache.org/jira/browse/KAFKA-1865


Repository: kafka


Description
-------

KAFKA-1865 Add a flush() method to the producer.


Diffs (updated)
-----

  clients/src/main/java/org/apache/kafka/clients/Metadata.java e8afecda956303a6ee116499fd443a54c018e17d 
  clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 1fd6917c8a5131254c740abad7f7228a47e3628c 
  clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 84530f2b948f9abd74203db48707e490dd9c81a5 
  clients/src/main/java/org/apache/kafka/clients/producer/Producer.java 17fe541588d462c68c33f6209717cc4015e9b62f 
  clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java 4990692efa6f01c62e1d7b05fbf31bec50e398c9 
  clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java 4a2da41f47994f778109e3c4107ffd90195f0bae 
  clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java ecfe2144d778a5d9b614df5278b9f0a15637f10b 
  clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java dd0af8aee98abed5d4a0dc50989e37888bb353fe 
  clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java PRE-CREATION 
  clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java d682bd46ec3826f0a72388cc4ec30e1b1223d0f3 
  clients/src/test/java/org/apache/kafka/clients/producer/BufferPoolTest.java 4ae43ed47e31ad8052b4348a731da11120968508 
  clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java 743aa7e523dd476949f484bfa4c7fb8a3afd7bf8 
  clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java 75513b0bdd439329c5771d87436ef83fda853bfb 
  clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java 404bedb3dc4e44cc79251d71e1e3f8efdab60efa 
  clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java 83338633717cfa4ef7cf2a590b5aa6b9c8cb1dd2 
  clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java 558942aaecd1b9f7098435d39aa4b362cd16ff0a 
  core/src/test/scala/integration/kafka/api/ConsumerTest.scala 2802a399bf599e9530f53b7df72f12702a10d3c4 
  core/src/test/scala/integration/kafka/api/ProducerSendTest.scala b15237b76def3b234924280fa3fdb25dbb0cc0dc 
  core/src/test/scala/unit/kafka/utils/TestUtils.scala 21d0ed2cb7c9459261d3cdc7c21dece5e2079698 

Diff: https://reviews.apache.org/r/30763/diff/


Testing
-------

The latest patch uses Jiangjie's suggestion to remove the synchronization on flush.


Thanks,

Jay Kreps


Re: Review Request 30763: KAFKA-1865: Producer Flush: Remove synchronization.

Posted by Jay Kreps <bo...@gmail.com>.

> On Feb. 26, 2015, 7:14 a.m., Ewen Cheslack-Postava wrote:
> > clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java, line 132
> > <https://reviews.apache.org/r/30763/diff/5/?file=877020#file877020line132>
> >
> >     synchronized no longer matches the regular non-Mock version.

That is correct, but the synchonization strategy for the mock is generally different than for the real producer--the mock just has a global lock on everything. This is true for send as well.


- Jay


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30763/#review74229
-----------------------------------------------------------


On Feb. 26, 2015, 1:16 a.m., Jay Kreps wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/30763/
> -----------------------------------------------------------
> 
> (Updated Feb. 26, 2015, 1:16 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1865
>     https://issues.apache.org/jira/browse/KAFKA-1865
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> KAFKA-1865 Add a flush() method to the producer.
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/clients/Metadata.java e8afecda956303a6ee116499fd443a54c018e17d 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 1fd6917c8a5131254c740abad7f7228a47e3628c 
>   clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 84530f2b948f9abd74203db48707e490dd9c81a5 
>   clients/src/main/java/org/apache/kafka/clients/producer/Producer.java 17fe541588d462c68c33f6209717cc4015e9b62f 
>   clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java 4990692efa6f01c62e1d7b05fbf31bec50e398c9 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java 4a2da41f47994f778109e3c4107ffd90195f0bae 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java ecfe2144d778a5d9b614df5278b9f0a15637f10b 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java dd0af8aee98abed5d4a0dc50989e37888bb353fe 
>   clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java d682bd46ec3826f0a72388cc4ec30e1b1223d0f3 
>   clients/src/test/java/org/apache/kafka/clients/producer/BufferPoolTest.java 4ae43ed47e31ad8052b4348a731da11120968508 
>   clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java 743aa7e523dd476949f484bfa4c7fb8a3afd7bf8 
>   clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java 75513b0bdd439329c5771d87436ef83fda853bfb 
>   clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java 404bedb3dc4e44cc79251d71e1e3f8efdab60efa 
>   clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java 83338633717cfa4ef7cf2a590b5aa6b9c8cb1dd2 
>   clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java 558942aaecd1b9f7098435d39aa4b362cd16ff0a 
>   core/src/test/scala/integration/kafka/api/ConsumerTest.scala 2802a399bf599e9530f53b7df72f12702a10d3c4 
>   core/src/test/scala/integration/kafka/api/ProducerSendTest.scala b15237b76def3b234924280fa3fdb25dbb0cc0dc 
>   core/src/test/scala/unit/kafka/utils/TestUtils.scala 21d0ed2cb7c9459261d3cdc7c21dece5e2079698 
> 
> Diff: https://reviews.apache.org/r/30763/diff/
> 
> 
> Testing
> -------
> 
> The latest patch uses Jiangjie's suggestion to remove the synchronization on flush.
> 
> 
> Thanks,
> 
> Jay Kreps
> 
>


Re: Review Request 30763: KAFKA-1865: Producer Flush: Remove synchronization.

Posted by Ewen Cheslack-Postava <me...@ewencp.org>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30763/#review74229
-----------------------------------------------------------



clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
<https://reviews.apache.org/r/30763/#comment120749>

    synchronized no longer matches the regular non-Mock version.


- Ewen Cheslack-Postava


On Feb. 26, 2015, 1:16 a.m., Jay Kreps wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/30763/
> -----------------------------------------------------------
> 
> (Updated Feb. 26, 2015, 1:16 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1865
>     https://issues.apache.org/jira/browse/KAFKA-1865
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> KAFKA-1865 Add a flush() method to the producer.
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/clients/Metadata.java e8afecda956303a6ee116499fd443a54c018e17d 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 1fd6917c8a5131254c740abad7f7228a47e3628c 
>   clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 84530f2b948f9abd74203db48707e490dd9c81a5 
>   clients/src/main/java/org/apache/kafka/clients/producer/Producer.java 17fe541588d462c68c33f6209717cc4015e9b62f 
>   clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java 4990692efa6f01c62e1d7b05fbf31bec50e398c9 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java 4a2da41f47994f778109e3c4107ffd90195f0bae 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java ecfe2144d778a5d9b614df5278b9f0a15637f10b 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java dd0af8aee98abed5d4a0dc50989e37888bb353fe 
>   clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java d682bd46ec3826f0a72388cc4ec30e1b1223d0f3 
>   clients/src/test/java/org/apache/kafka/clients/producer/BufferPoolTest.java 4ae43ed47e31ad8052b4348a731da11120968508 
>   clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java 743aa7e523dd476949f484bfa4c7fb8a3afd7bf8 
>   clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java 75513b0bdd439329c5771d87436ef83fda853bfb 
>   clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java 404bedb3dc4e44cc79251d71e1e3f8efdab60efa 
>   clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java 83338633717cfa4ef7cf2a590b5aa6b9c8cb1dd2 
>   clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java 558942aaecd1b9f7098435d39aa4b362cd16ff0a 
>   core/src/test/scala/integration/kafka/api/ConsumerTest.scala 2802a399bf599e9530f53b7df72f12702a10d3c4 
>   core/src/test/scala/integration/kafka/api/ProducerSendTest.scala b15237b76def3b234924280fa3fdb25dbb0cc0dc 
>   core/src/test/scala/unit/kafka/utils/TestUtils.scala 21d0ed2cb7c9459261d3cdc7c21dece5e2079698 
> 
> Diff: https://reviews.apache.org/r/30763/diff/
> 
> 
> Testing
> -------
> 
> The latest patch uses Jiangjie's suggestion to remove the synchronization on flush.
> 
> 
> Thanks,
> 
> Jay Kreps
> 
>


Re: Review Request 30763: KAFKA-1865: Producer Flush: Remove synchronization.

Posted by Jay Kreps <bo...@gmail.com>.

> On Feb. 26, 2015, 5:47 a.m., Jun Rao wrote:
> > clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java, line 503
> > <https://reviews.apache.org/r/30763/diff/5/?file=877019#file877019line503>
> >
> >     Not sure how we make sure only a single flush at a time. Could you explain a bit more?

Actually we don't. Jiangjie's suggestion which I implemented in this patch allows any number of flushes to occur concurrently without synchronization.

The way it works is that each thread that begins a flush increments the counter, when it is complete it decrements the counter. As long as the counter is > 0 flushing is needed and linger is disabled.


> On Feb. 26, 2015, 5:47 a.m., Jun Rao wrote:
> > clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java, line 348
> > <https://reviews.apache.org/r/30763/diff/5/?file=877024#file877024line348>
> >
> >     Could flushesInProgress just be an AtomicBoolean?

See above.


- Jay


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30763/#review74196
-----------------------------------------------------------


On Feb. 26, 2015, 1:16 a.m., Jay Kreps wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/30763/
> -----------------------------------------------------------
> 
> (Updated Feb. 26, 2015, 1:16 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1865
>     https://issues.apache.org/jira/browse/KAFKA-1865
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> KAFKA-1865 Add a flush() method to the producer.
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/clients/Metadata.java e8afecda956303a6ee116499fd443a54c018e17d 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 1fd6917c8a5131254c740abad7f7228a47e3628c 
>   clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 84530f2b948f9abd74203db48707e490dd9c81a5 
>   clients/src/main/java/org/apache/kafka/clients/producer/Producer.java 17fe541588d462c68c33f6209717cc4015e9b62f 
>   clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java 4990692efa6f01c62e1d7b05fbf31bec50e398c9 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java 4a2da41f47994f778109e3c4107ffd90195f0bae 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java ecfe2144d778a5d9b614df5278b9f0a15637f10b 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java dd0af8aee98abed5d4a0dc50989e37888bb353fe 
>   clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java d682bd46ec3826f0a72388cc4ec30e1b1223d0f3 
>   clients/src/test/java/org/apache/kafka/clients/producer/BufferPoolTest.java 4ae43ed47e31ad8052b4348a731da11120968508 
>   clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java 743aa7e523dd476949f484bfa4c7fb8a3afd7bf8 
>   clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java 75513b0bdd439329c5771d87436ef83fda853bfb 
>   clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java 404bedb3dc4e44cc79251d71e1e3f8efdab60efa 
>   clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java 83338633717cfa4ef7cf2a590b5aa6b9c8cb1dd2 
>   clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java 558942aaecd1b9f7098435d39aa4b362cd16ff0a 
>   core/src/test/scala/integration/kafka/api/ConsumerTest.scala 2802a399bf599e9530f53b7df72f12702a10d3c4 
>   core/src/test/scala/integration/kafka/api/ProducerSendTest.scala b15237b76def3b234924280fa3fdb25dbb0cc0dc 
>   core/src/test/scala/unit/kafka/utils/TestUtils.scala 21d0ed2cb7c9459261d3cdc7c21dece5e2079698 
> 
> Diff: https://reviews.apache.org/r/30763/diff/
> 
> 
> Testing
> -------
> 
> The latest patch uses Jiangjie's suggestion to remove the synchronization on flush.
> 
> 
> Thanks,
> 
> Jay Kreps
> 
>


Re: Review Request 30763: KAFKA-1865: Producer Flush: Remove synchronization.

Posted by Jun Rao <ju...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30763/#review74196
-----------------------------------------------------------


Thanks for the patch. A few comments below.


clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
<https://reviews.apache.org/r/30763/#comment120733>

    Not sure how we make sure only a single flush at a time. Could you explain a bit more?



clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
<https://reviews.apache.org/r/30763/#comment120734>

    Could flushesInProgress just be an AtomicBoolean?



clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
<https://reviews.apache.org/r/30763/#comment120738>

    This assertion doesn't seem cover the case that one of the future is done, but the other is not.


- Jun Rao


On Feb. 26, 2015, 1:16 a.m., Jay Kreps wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/30763/
> -----------------------------------------------------------
> 
> (Updated Feb. 26, 2015, 1:16 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1865
>     https://issues.apache.org/jira/browse/KAFKA-1865
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> KAFKA-1865 Add a flush() method to the producer.
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/clients/Metadata.java e8afecda956303a6ee116499fd443a54c018e17d 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 1fd6917c8a5131254c740abad7f7228a47e3628c 
>   clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 84530f2b948f9abd74203db48707e490dd9c81a5 
>   clients/src/main/java/org/apache/kafka/clients/producer/Producer.java 17fe541588d462c68c33f6209717cc4015e9b62f 
>   clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java 4990692efa6f01c62e1d7b05fbf31bec50e398c9 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java 4a2da41f47994f778109e3c4107ffd90195f0bae 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java ecfe2144d778a5d9b614df5278b9f0a15637f10b 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java dd0af8aee98abed5d4a0dc50989e37888bb353fe 
>   clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java d682bd46ec3826f0a72388cc4ec30e1b1223d0f3 
>   clients/src/test/java/org/apache/kafka/clients/producer/BufferPoolTest.java 4ae43ed47e31ad8052b4348a731da11120968508 
>   clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java 743aa7e523dd476949f484bfa4c7fb8a3afd7bf8 
>   clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java 75513b0bdd439329c5771d87436ef83fda853bfb 
>   clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java 404bedb3dc4e44cc79251d71e1e3f8efdab60efa 
>   clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java 83338633717cfa4ef7cf2a590b5aa6b9c8cb1dd2 
>   clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java 558942aaecd1b9f7098435d39aa4b362cd16ff0a 
>   core/src/test/scala/integration/kafka/api/ConsumerTest.scala 2802a399bf599e9530f53b7df72f12702a10d3c4 
>   core/src/test/scala/integration/kafka/api/ProducerSendTest.scala b15237b76def3b234924280fa3fdb25dbb0cc0dc 
>   core/src/test/scala/unit/kafka/utils/TestUtils.scala 21d0ed2cb7c9459261d3cdc7c21dece5e2079698 
> 
> Diff: https://reviews.apache.org/r/30763/diff/
> 
> 
> Testing
> -------
> 
> The latest patch uses Jiangjie's suggestion to remove the synchronization on flush.
> 
> 
> Thanks,
> 
> Jay Kreps
> 
>


Re: Review Request 30763: KAFKA-1865: Producer Flush: Remove synchronization.

Posted by Jay Kreps <bo...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30763/
-----------------------------------------------------------

(Updated Feb. 26, 2015, 1:16 a.m.)


Review request for kafka.


Bugs: KAFKA-1865
    https://issues.apache.org/jira/browse/KAFKA-1865


Repository: kafka


Description
-------

KAFKA-1865 Add a flush() method to the producer.


Diffs
-----

  clients/src/main/java/org/apache/kafka/clients/Metadata.java e8afecda956303a6ee116499fd443a54c018e17d 
  clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 1fd6917c8a5131254c740abad7f7228a47e3628c 
  clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 84530f2b948f9abd74203db48707e490dd9c81a5 
  clients/src/main/java/org/apache/kafka/clients/producer/Producer.java 17fe541588d462c68c33f6209717cc4015e9b62f 
  clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java 4990692efa6f01c62e1d7b05fbf31bec50e398c9 
  clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java 4a2da41f47994f778109e3c4107ffd90195f0bae 
  clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java ecfe2144d778a5d9b614df5278b9f0a15637f10b 
  clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java dd0af8aee98abed5d4a0dc50989e37888bb353fe 
  clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java PRE-CREATION 
  clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java d682bd46ec3826f0a72388cc4ec30e1b1223d0f3 
  clients/src/test/java/org/apache/kafka/clients/producer/BufferPoolTest.java 4ae43ed47e31ad8052b4348a731da11120968508 
  clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java 743aa7e523dd476949f484bfa4c7fb8a3afd7bf8 
  clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java 75513b0bdd439329c5771d87436ef83fda853bfb 
  clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java 404bedb3dc4e44cc79251d71e1e3f8efdab60efa 
  clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java 83338633717cfa4ef7cf2a590b5aa6b9c8cb1dd2 
  clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java 558942aaecd1b9f7098435d39aa4b362cd16ff0a 
  core/src/test/scala/integration/kafka/api/ConsumerTest.scala 2802a399bf599e9530f53b7df72f12702a10d3c4 
  core/src/test/scala/integration/kafka/api/ProducerSendTest.scala b15237b76def3b234924280fa3fdb25dbb0cc0dc 
  core/src/test/scala/unit/kafka/utils/TestUtils.scala 21d0ed2cb7c9459261d3cdc7c21dece5e2079698 

Diff: https://reviews.apache.org/r/30763/diff/


Testing (updated)
-------

The latest patch uses Jiangjie's suggestion to remove the synchronization on flush.


Thanks,

Jay Kreps


Re: Review Request 30763: KAFKA-1865: Producer Flush: Remove synchronization.

Posted by Jay Kreps <bo...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30763/
-----------------------------------------------------------

(Updated Feb. 26, 2015, 1:15 a.m.)


Review request for kafka.


Summary (updated)
-----------------

KAFKA-1865: Producer Flush: Remove synchronization.


Bugs: KAFKA-1865
    https://issues.apache.org/jira/browse/KAFKA-1865


Repository: kafka


Description
-------

KAFKA-1865 Add a flush() method to the producer.


Diffs (updated)
-----

  clients/src/main/java/org/apache/kafka/clients/Metadata.java e8afecda956303a6ee116499fd443a54c018e17d 
  clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 1fd6917c8a5131254c740abad7f7228a47e3628c 
  clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 84530f2b948f9abd74203db48707e490dd9c81a5 
  clients/src/main/java/org/apache/kafka/clients/producer/Producer.java 17fe541588d462c68c33f6209717cc4015e9b62f 
  clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java 4990692efa6f01c62e1d7b05fbf31bec50e398c9 
  clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java 4a2da41f47994f778109e3c4107ffd90195f0bae 
  clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java ecfe2144d778a5d9b614df5278b9f0a15637f10b 
  clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java dd0af8aee98abed5d4a0dc50989e37888bb353fe 
  clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java PRE-CREATION 
  clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java d682bd46ec3826f0a72388cc4ec30e1b1223d0f3 
  clients/src/test/java/org/apache/kafka/clients/producer/BufferPoolTest.java 4ae43ed47e31ad8052b4348a731da11120968508 
  clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java 743aa7e523dd476949f484bfa4c7fb8a3afd7bf8 
  clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java 75513b0bdd439329c5771d87436ef83fda853bfb 
  clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java 404bedb3dc4e44cc79251d71e1e3f8efdab60efa 
  clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java 83338633717cfa4ef7cf2a590b5aa6b9c8cb1dd2 
  clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java 558942aaecd1b9f7098435d39aa4b362cd16ff0a 
  core/src/test/scala/integration/kafka/api/ConsumerTest.scala 2802a399bf599e9530f53b7df72f12702a10d3c4 
  core/src/test/scala/integration/kafka/api/ProducerSendTest.scala b15237b76def3b234924280fa3fdb25dbb0cc0dc 
  core/src/test/scala/unit/kafka/utils/TestUtils.scala 21d0ed2cb7c9459261d3cdc7c21dece5e2079698 

Diff: https://reviews.apache.org/r/30763/diff/


Testing
-------

New patch addresses feedback. Also (1) comments out the consumer tests so I could verify everything else passes and (2) moves some unit tests I found that were in the wrong packages.


Thanks,

Jay Kreps


Re: Review Request 30763: KAFKA-1865: Producer Flush: if at first your patch doesn't succeed, try, try, try again.

Posted by Jay Kreps <bo...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30763/
-----------------------------------------------------------

(Updated Feb. 24, 2015, 2:31 a.m.)


Review request for kafka.


Bugs: KAFKA-1865
    https://issues.apache.org/jira/browse/KAFKA-1865


Repository: kafka


Description
-------

KAFKA-1865 Add a flush() method to the producer.


Diffs
-----

  clients/src/main/java/org/apache/kafka/clients/Metadata.java e8afecda956303a6ee116499fd443a54c018e17d 
  clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 1fd6917c8a5131254c740abad7f7228a47e3628c 
  clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 84530f2b948f9abd74203db48707e490dd9c81a5 
  clients/src/main/java/org/apache/kafka/clients/producer/Producer.java 17fe541588d462c68c33f6209717cc4015e9b62f 
  clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java 4990692efa6f01c62e1d7b05fbf31bec50e398c9 
  clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java 4a2da41f47994f778109e3c4107ffd90195f0bae 
  clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java ecfe2144d778a5d9b614df5278b9f0a15637f10b 
  clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java dd0af8aee98abed5d4a0dc50989e37888bb353fe 
  clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java PRE-CREATION 
  clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java d682bd46ec3826f0a72388cc4ec30e1b1223d0f3 
  clients/src/test/java/org/apache/kafka/clients/producer/BufferPoolTest.java 4ae43ed47e31ad8052b4348a731da11120968508 
  clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java 743aa7e523dd476949f484bfa4c7fb8a3afd7bf8 
  clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java 75513b0bdd439329c5771d87436ef83fda853bfb 
  clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java 29c8417422c0cf0d29bf2405c77fd05e35350259 
  clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java 83338633717cfa4ef7cf2a590b5aa6b9c8cb1dd2 
  clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java 558942aaecd1b9f7098435d39aa4b362cd16ff0a 
  core/src/test/scala/integration/kafka/api/ConsumerTest.scala 2802a399bf599e9530f53b7df72f12702a10d3c4 
  core/src/test/scala/integration/kafka/api/ProducerSendTest.scala b15237b76def3b234924280fa3fdb25dbb0cc0dc 
  core/src/test/scala/unit/kafka/utils/TestUtils.scala 21d0ed2cb7c9459261d3cdc7c21dece5e2079698 

Diff: https://reviews.apache.org/r/30763/diff/


Testing (updated)
-------

New patch addresses feedback. Also (1) comments out the consumer tests so I could verify everything else passes and (2) moves some unit tests I found that were in the wrong packages.


Thanks,

Jay Kreps


Re: Review Request 30763: KAFKA-1865: Producer Flush: if at first your patch doesn't succeed, try, try, try again.

Posted by Jay Kreps <bo...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30763/
-----------------------------------------------------------

(Updated Feb. 24, 2015, 2:29 a.m.)


Review request for kafka.


Summary (updated)
-----------------

KAFKA-1865: Producer Flush: if at first your patch doesn't succeed, try, try, try again.


Bugs: KAFKA-1865
    https://issues.apache.org/jira/browse/KAFKA-1865


Repository: kafka


Description
-------

KAFKA-1865 Add a flush() method to the producer.


Diffs (updated)
-----

  clients/src/main/java/org/apache/kafka/clients/Metadata.java e8afecda956303a6ee116499fd443a54c018e17d 
  clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 1fd6917c8a5131254c740abad7f7228a47e3628c 
  clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 84530f2b948f9abd74203db48707e490dd9c81a5 
  clients/src/main/java/org/apache/kafka/clients/producer/Producer.java 17fe541588d462c68c33f6209717cc4015e9b62f 
  clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java 4990692efa6f01c62e1d7b05fbf31bec50e398c9 
  clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java 4a2da41f47994f778109e3c4107ffd90195f0bae 
  clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java ecfe2144d778a5d9b614df5278b9f0a15637f10b 
  clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java dd0af8aee98abed5d4a0dc50989e37888bb353fe 
  clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java PRE-CREATION 
  clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java d682bd46ec3826f0a72388cc4ec30e1b1223d0f3 
  clients/src/test/java/org/apache/kafka/clients/producer/BufferPoolTest.java 4ae43ed47e31ad8052b4348a731da11120968508 
  clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java 743aa7e523dd476949f484bfa4c7fb8a3afd7bf8 
  clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java 75513b0bdd439329c5771d87436ef83fda853bfb 
  clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java 29c8417422c0cf0d29bf2405c77fd05e35350259 
  clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java 83338633717cfa4ef7cf2a590b5aa6b9c8cb1dd2 
  clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java 558942aaecd1b9f7098435d39aa4b362cd16ff0a 
  core/src/test/scala/integration/kafka/api/ConsumerTest.scala 2802a399bf599e9530f53b7df72f12702a10d3c4 
  core/src/test/scala/integration/kafka/api/ProducerSendTest.scala b15237b76def3b234924280fa3fdb25dbb0cc0dc 
  core/src/test/scala/unit/kafka/utils/TestUtils.scala 21d0ed2cb7c9459261d3cdc7c21dece5e2079698 

Diff: https://reviews.apache.org/r/30763/diff/


Testing
-------


Thanks,

Jay Kreps


Re: Review Request 30763: Second attempt at flush()

Posted by Jay Kreps <bo...@gmail.com>.

> On Feb. 23, 2015, 5 a.m., Jiangjie Qin wrote:
> > clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java, line 210
> > <https://reviews.apache.org/r/30763/diff/3/?file=872457#file872457line210>
> >
> >     It probably does not matter, but here we are only sending 10 messages which can be put into one batch. Should we test the case where accumulator has more than one batch for a partition?

Fair point.


- Jay


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30763/#review73505
-----------------------------------------------------------


On Feb. 23, 2015, 12:26 a.m., Jay Kreps wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/30763/
> -----------------------------------------------------------
> 
> (Updated Feb. 23, 2015, 12:26 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1865
>     https://issues.apache.org/jira/browse/KAFKA-1865
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> KAFKA-1865 Add a flush() method to the producer.
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/clients/Metadata.java e8afecda956303a6ee116499fd443a54c018e17d 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 1fd6917c8a5131254c740abad7f7228a47e3628c 
>   clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 84530f2b948f9abd74203db48707e490dd9c81a5 
>   clients/src/main/java/org/apache/kafka/clients/producer/Producer.java 17fe541588d462c68c33f6209717cc4015e9b62f 
>   clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java 4990692efa6f01c62e1d7b05fbf31bec50e398c9 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java 4a2da41f47994f778109e3c4107ffd90195f0bae 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java ecfe2144d778a5d9b614df5278b9f0a15637f10b 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java dd0af8aee98abed5d4a0dc50989e37888bb353fe 
>   clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java d682bd46ec3826f0a72388cc4ec30e1b1223d0f3 
>   clients/src/test/java/org/apache/kafka/clients/producer/BufferPoolTest.java 4ae43ed47e31ad8052b4348a731da11120968508 
>   clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java 743aa7e523dd476949f484bfa4c7fb8a3afd7bf8 
>   clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java 75513b0bdd439329c5771d87436ef83fda853bfb 
>   clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java 83338633717cfa4ef7cf2a590b5aa6b9c8cb1dd2 
>   core/src/test/scala/integration/kafka/api/ConsumerTest.scala 2802a399bf599e9530f53b7df72f12702a10d3c4 
>   core/src/test/scala/integration/kafka/api/ProducerSendTest.scala b15237b76def3b234924280fa3fdb25dbb0cc0dc 
>   core/src/test/scala/unit/kafka/utils/TestUtils.scala 21d0ed2cb7c9459261d3cdc7c21dece5e2079698 
> 
> Diff: https://reviews.apache.org/r/30763/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Jay Kreps
> 
>


Re: Review Request 30763: Second attempt at flush()

Posted by Jiangjie Qin <be...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30763/#review73505
-----------------------------------------------------------

Ship it!


Thanks Jay. Looks good to me. Just a minor comment.


clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java
<https://reviews.apache.org/r/30763/#comment119860>

    It probably does not matter, but here we are only sending 10 messages which can be put into one batch. Should we test the case where accumulator has more than one batch for a partition?


- Jiangjie Qin


On Feb. 23, 2015, 12:26 a.m., Jay Kreps wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/30763/
> -----------------------------------------------------------
> 
> (Updated Feb. 23, 2015, 12:26 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1865
>     https://issues.apache.org/jira/browse/KAFKA-1865
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> KAFKA-1865 Add a flush() method to the producer.
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/clients/Metadata.java e8afecda956303a6ee116499fd443a54c018e17d 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 1fd6917c8a5131254c740abad7f7228a47e3628c 
>   clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 84530f2b948f9abd74203db48707e490dd9c81a5 
>   clients/src/main/java/org/apache/kafka/clients/producer/Producer.java 17fe541588d462c68c33f6209717cc4015e9b62f 
>   clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java 4990692efa6f01c62e1d7b05fbf31bec50e398c9 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java 4a2da41f47994f778109e3c4107ffd90195f0bae 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java ecfe2144d778a5d9b614df5278b9f0a15637f10b 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java dd0af8aee98abed5d4a0dc50989e37888bb353fe 
>   clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java d682bd46ec3826f0a72388cc4ec30e1b1223d0f3 
>   clients/src/test/java/org/apache/kafka/clients/producer/BufferPoolTest.java 4ae43ed47e31ad8052b4348a731da11120968508 
>   clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java 743aa7e523dd476949f484bfa4c7fb8a3afd7bf8 
>   clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java 75513b0bdd439329c5771d87436ef83fda853bfb 
>   clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java 83338633717cfa4ef7cf2a590b5aa6b9c8cb1dd2 
>   core/src/test/scala/integration/kafka/api/ConsumerTest.scala 2802a399bf599e9530f53b7df72f12702a10d3c4 
>   core/src/test/scala/integration/kafka/api/ProducerSendTest.scala b15237b76def3b234924280fa3fdb25dbb0cc0dc 
>   core/src/test/scala/unit/kafka/utils/TestUtils.scala 21d0ed2cb7c9459261d3cdc7c21dece5e2079698 
> 
> Diff: https://reviews.apache.org/r/30763/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Jay Kreps
> 
>


Re: Review Request 30763: Second attempt at flush()

Posted by Jay Kreps <bo...@gmail.com>.

> On Feb. 23, 2015, 8:53 p.m., Jiangjie Qin wrote:
> > Hi Jay, I applied the patch and tried to run it in our test environment. I got this exception:
> > 
> > java.util.ConcurrentModificationException
> >         at java.util.HashMap$HashIterator.nextNode(HashMap.java:1429)
> >         at java.util.HashMap$KeyIterator.next(HashMap.java:1453)
> >         at java.util.AbstractCollection.addAll(AbstractCollection.java:343)
> >         at java.util.HashSet.<init>(HashSet.java:119)
> >         at org.apache.kafka.clients.producer.internals.RecordAccumulator.awaitFlushCompletion(RecordAccumulator.java:348)
> >         at org.apache.kafka.clients.producer.KafkaProducer.flush(KafkaProducer.java:498)
> >         at kafka.tools.MirrorMaker$MirrorMakerProducer.flush(MirrorMaker.scala:373)
> >         at kafka.tools.MirrorMaker$MirrorMakerThread.run(MirrorMaker.scala:314)
> > 
> > It looks that we should synchronize on the access to incomplete. i.e. when one thread is making a copy of imcomplete set, other thread should not add batches into it.
> 
> Jiangjie Qin wrote:
>     From the java doc of Collection.synchronizedSet()
>     
>     It is imperative that the user manually synchronize on the returned set when iterating over it:
>             Set s = Collections.synchronizedSet(new HashSet());
>                 ...
>             synchronized (s) {
>                 Iterator i = s.iterator(); // Must be in the synchronized block
>                 while (i.hasNext())
>                     foo(i.next());
>             }

Ack, nice catch. Didn't realize that. Will fix.


- Jay


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30763/#review73665
-----------------------------------------------------------


On Feb. 23, 2015, 12:26 a.m., Jay Kreps wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/30763/
> -----------------------------------------------------------
> 
> (Updated Feb. 23, 2015, 12:26 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1865
>     https://issues.apache.org/jira/browse/KAFKA-1865
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> KAFKA-1865 Add a flush() method to the producer.
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/clients/Metadata.java e8afecda956303a6ee116499fd443a54c018e17d 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 1fd6917c8a5131254c740abad7f7228a47e3628c 
>   clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 84530f2b948f9abd74203db48707e490dd9c81a5 
>   clients/src/main/java/org/apache/kafka/clients/producer/Producer.java 17fe541588d462c68c33f6209717cc4015e9b62f 
>   clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java 4990692efa6f01c62e1d7b05fbf31bec50e398c9 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java 4a2da41f47994f778109e3c4107ffd90195f0bae 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java ecfe2144d778a5d9b614df5278b9f0a15637f10b 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java dd0af8aee98abed5d4a0dc50989e37888bb353fe 
>   clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java d682bd46ec3826f0a72388cc4ec30e1b1223d0f3 
>   clients/src/test/java/org/apache/kafka/clients/producer/BufferPoolTest.java 4ae43ed47e31ad8052b4348a731da11120968508 
>   clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java 743aa7e523dd476949f484bfa4c7fb8a3afd7bf8 
>   clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java 75513b0bdd439329c5771d87436ef83fda853bfb 
>   clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java 83338633717cfa4ef7cf2a590b5aa6b9c8cb1dd2 
>   core/src/test/scala/integration/kafka/api/ConsumerTest.scala 2802a399bf599e9530f53b7df72f12702a10d3c4 
>   core/src/test/scala/integration/kafka/api/ProducerSendTest.scala b15237b76def3b234924280fa3fdb25dbb0cc0dc 
>   core/src/test/scala/unit/kafka/utils/TestUtils.scala 21d0ed2cb7c9459261d3cdc7c21dece5e2079698 
> 
> Diff: https://reviews.apache.org/r/30763/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Jay Kreps
> 
>


Re: Review Request 30763: Second attempt at flush()

Posted by Jiangjie Qin <be...@gmail.com>.

> On Feb. 23, 2015, 8:53 p.m., Jiangjie Qin wrote:
> > Hi Jay, I applied the patch and tried to run it in our test environment. I got this exception:
> > 
> > java.util.ConcurrentModificationException
> >         at java.util.HashMap$HashIterator.nextNode(HashMap.java:1429)
> >         at java.util.HashMap$KeyIterator.next(HashMap.java:1453)
> >         at java.util.AbstractCollection.addAll(AbstractCollection.java:343)
> >         at java.util.HashSet.<init>(HashSet.java:119)
> >         at org.apache.kafka.clients.producer.internals.RecordAccumulator.awaitFlushCompletion(RecordAccumulator.java:348)
> >         at org.apache.kafka.clients.producer.KafkaProducer.flush(KafkaProducer.java:498)
> >         at kafka.tools.MirrorMaker$MirrorMakerProducer.flush(MirrorMaker.scala:373)
> >         at kafka.tools.MirrorMaker$MirrorMakerThread.run(MirrorMaker.scala:314)
> > 
> > It looks that we should synchronize on the access to incomplete. i.e. when one thread is making a copy of imcomplete set, other thread should not add batches into it.

>From the java doc of Collection.synchronizedSet()

It is imperative that the user manually synchronize on the returned set when iterating over it:
        Set s = Collections.synchronizedSet(new HashSet());
            ...
        synchronized (s) {
            Iterator i = s.iterator(); // Must be in the synchronized block
            while (i.hasNext())
                foo(i.next());
        }


- Jiangjie


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30763/#review73665
-----------------------------------------------------------


On Feb. 23, 2015, 12:26 a.m., Jay Kreps wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/30763/
> -----------------------------------------------------------
> 
> (Updated Feb. 23, 2015, 12:26 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1865
>     https://issues.apache.org/jira/browse/KAFKA-1865
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> KAFKA-1865 Add a flush() method to the producer.
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/clients/Metadata.java e8afecda956303a6ee116499fd443a54c018e17d 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 1fd6917c8a5131254c740abad7f7228a47e3628c 
>   clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 84530f2b948f9abd74203db48707e490dd9c81a5 
>   clients/src/main/java/org/apache/kafka/clients/producer/Producer.java 17fe541588d462c68c33f6209717cc4015e9b62f 
>   clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java 4990692efa6f01c62e1d7b05fbf31bec50e398c9 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java 4a2da41f47994f778109e3c4107ffd90195f0bae 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java ecfe2144d778a5d9b614df5278b9f0a15637f10b 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java dd0af8aee98abed5d4a0dc50989e37888bb353fe 
>   clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java d682bd46ec3826f0a72388cc4ec30e1b1223d0f3 
>   clients/src/test/java/org/apache/kafka/clients/producer/BufferPoolTest.java 4ae43ed47e31ad8052b4348a731da11120968508 
>   clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java 743aa7e523dd476949f484bfa4c7fb8a3afd7bf8 
>   clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java 75513b0bdd439329c5771d87436ef83fda853bfb 
>   clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java 83338633717cfa4ef7cf2a590b5aa6b9c8cb1dd2 
>   core/src/test/scala/integration/kafka/api/ConsumerTest.scala 2802a399bf599e9530f53b7df72f12702a10d3c4 
>   core/src/test/scala/integration/kafka/api/ProducerSendTest.scala b15237b76def3b234924280fa3fdb25dbb0cc0dc 
>   core/src/test/scala/unit/kafka/utils/TestUtils.scala 21d0ed2cb7c9459261d3cdc7c21dece5e2079698 
> 
> Diff: https://reviews.apache.org/r/30763/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Jay Kreps
> 
>


Re: Review Request 30763: Second attempt at flush()

Posted by Jiangjie Qin <be...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30763/#review73665
-----------------------------------------------------------


Hi Jay, I applied the patch and tried to run it in our test environment. I got this exception:

java.util.ConcurrentModificationException
        at java.util.HashMap$HashIterator.nextNode(HashMap.java:1429)
        at java.util.HashMap$KeyIterator.next(HashMap.java:1453)
        at java.util.AbstractCollection.addAll(AbstractCollection.java:343)
        at java.util.HashSet.<init>(HashSet.java:119)
        at org.apache.kafka.clients.producer.internals.RecordAccumulator.awaitFlushCompletion(RecordAccumulator.java:348)
        at org.apache.kafka.clients.producer.KafkaProducer.flush(KafkaProducer.java:498)
        at kafka.tools.MirrorMaker$MirrorMakerProducer.flush(MirrorMaker.scala:373)
        at kafka.tools.MirrorMaker$MirrorMakerThread.run(MirrorMaker.scala:314)

It looks that we should synchronize on the access to incomplete. i.e. when one thread is making a copy of imcomplete set, other thread should not add batches into it.

- Jiangjie Qin


On Feb. 23, 2015, 12:26 a.m., Jay Kreps wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/30763/
> -----------------------------------------------------------
> 
> (Updated Feb. 23, 2015, 12:26 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1865
>     https://issues.apache.org/jira/browse/KAFKA-1865
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> KAFKA-1865 Add a flush() method to the producer.
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/clients/Metadata.java e8afecda956303a6ee116499fd443a54c018e17d 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 1fd6917c8a5131254c740abad7f7228a47e3628c 
>   clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 84530f2b948f9abd74203db48707e490dd9c81a5 
>   clients/src/main/java/org/apache/kafka/clients/producer/Producer.java 17fe541588d462c68c33f6209717cc4015e9b62f 
>   clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java 4990692efa6f01c62e1d7b05fbf31bec50e398c9 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java 4a2da41f47994f778109e3c4107ffd90195f0bae 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java ecfe2144d778a5d9b614df5278b9f0a15637f10b 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java dd0af8aee98abed5d4a0dc50989e37888bb353fe 
>   clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java d682bd46ec3826f0a72388cc4ec30e1b1223d0f3 
>   clients/src/test/java/org/apache/kafka/clients/producer/BufferPoolTest.java 4ae43ed47e31ad8052b4348a731da11120968508 
>   clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java 743aa7e523dd476949f484bfa4c7fb8a3afd7bf8 
>   clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java 75513b0bdd439329c5771d87436ef83fda853bfb 
>   clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java 83338633717cfa4ef7cf2a590b5aa6b9c8cb1dd2 
>   core/src/test/scala/integration/kafka/api/ConsumerTest.scala 2802a399bf599e9530f53b7df72f12702a10d3c4 
>   core/src/test/scala/integration/kafka/api/ProducerSendTest.scala b15237b76def3b234924280fa3fdb25dbb0cc0dc 
>   core/src/test/scala/unit/kafka/utils/TestUtils.scala 21d0ed2cb7c9459261d3cdc7c21dece5e2079698 
> 
> Diff: https://reviews.apache.org/r/30763/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Jay Kreps
> 
>


Re: Review Request 30763: Second attempt at flush()

Posted by Jay Kreps <bo...@gmail.com>.

> On Feb. 23, 2015, 6:44 a.m., Guozhang Wang wrote:
> > clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java, line 376
> > <https://reviews.apache.org/r/30763/diff/3/?file=872445#file872445line376>
> >
> >     Add @throws KafkaException

I added the other exceptions I know about. I didn't add KafkaException as I'm not sure if we ever directly throw that and I just want to comment things the user may need to be aware of (i.e. there are many programming errors like invalid partitions that will give IllegalArgumentException, but I don't cover those). Let me know if you see an exception I missed.


> On Feb. 23, 2015, 6:44 a.m., Guozhang Wang wrote:
> > clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java, line 91
> > <https://reviews.apache.org/r/30763/diff/3/?file=872451#file872451line91>
> >
> >     One possible optimization is to keep a RecordMetadata field in the FutureRecordMetadata, and value() call will then only create the object once. Here we could then call
> >     
> >     callback.onCompletion(thunk.future.value());

Tried to do this but it gets ugly because you have to split done() too and add another volatile access. I think in practice you either use the callback or the future so these should generally not duplicate.


- Jay


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30763/#review73475
-----------------------------------------------------------


On Feb. 23, 2015, 12:26 a.m., Jay Kreps wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/30763/
> -----------------------------------------------------------
> 
> (Updated Feb. 23, 2015, 12:26 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1865
>     https://issues.apache.org/jira/browse/KAFKA-1865
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> KAFKA-1865 Add a flush() method to the producer.
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/clients/Metadata.java e8afecda956303a6ee116499fd443a54c018e17d 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 1fd6917c8a5131254c740abad7f7228a47e3628c 
>   clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 84530f2b948f9abd74203db48707e490dd9c81a5 
>   clients/src/main/java/org/apache/kafka/clients/producer/Producer.java 17fe541588d462c68c33f6209717cc4015e9b62f 
>   clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java 4990692efa6f01c62e1d7b05fbf31bec50e398c9 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java 4a2da41f47994f778109e3c4107ffd90195f0bae 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java ecfe2144d778a5d9b614df5278b9f0a15637f10b 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java dd0af8aee98abed5d4a0dc50989e37888bb353fe 
>   clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java d682bd46ec3826f0a72388cc4ec30e1b1223d0f3 
>   clients/src/test/java/org/apache/kafka/clients/producer/BufferPoolTest.java 4ae43ed47e31ad8052b4348a731da11120968508 
>   clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java 743aa7e523dd476949f484bfa4c7fb8a3afd7bf8 
>   clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java 75513b0bdd439329c5771d87436ef83fda853bfb 
>   clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java 83338633717cfa4ef7cf2a590b5aa6b9c8cb1dd2 
>   core/src/test/scala/integration/kafka/api/ConsumerTest.scala 2802a399bf599e9530f53b7df72f12702a10d3c4 
>   core/src/test/scala/integration/kafka/api/ProducerSendTest.scala b15237b76def3b234924280fa3fdb25dbb0cc0dc 
>   core/src/test/scala/unit/kafka/utils/TestUtils.scala 21d0ed2cb7c9459261d3cdc7c21dece5e2079698 
> 
> Diff: https://reviews.apache.org/r/30763/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Jay Kreps
> 
>


Re: Review Request 30763: Second attempt at flush()

Posted by Guozhang Wang <wa...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30763/#review73475
-----------------------------------------------------------



clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
<https://reviews.apache.org/r/30763/#comment119810>

    Add @throws KafkaException



clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
<https://reviews.apache.org/r/30763/#comment119813>

    One possible optimization is to keep a RecordMetadata field in the FutureRecordMetadata, and value() call will then only create the object once. Here we could then call
    
    callback.onCompletion(thunk.future.value());


- Guozhang Wang


On Feb. 23, 2015, 12:26 a.m., Jay Kreps wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/30763/
> -----------------------------------------------------------
> 
> (Updated Feb. 23, 2015, 12:26 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1865
>     https://issues.apache.org/jira/browse/KAFKA-1865
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> KAFKA-1865 Add a flush() method to the producer.
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/clients/Metadata.java e8afecda956303a6ee116499fd443a54c018e17d 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 1fd6917c8a5131254c740abad7f7228a47e3628c 
>   clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 84530f2b948f9abd74203db48707e490dd9c81a5 
>   clients/src/main/java/org/apache/kafka/clients/producer/Producer.java 17fe541588d462c68c33f6209717cc4015e9b62f 
>   clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java 4990692efa6f01c62e1d7b05fbf31bec50e398c9 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java 4a2da41f47994f778109e3c4107ffd90195f0bae 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java ecfe2144d778a5d9b614df5278b9f0a15637f10b 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java dd0af8aee98abed5d4a0dc50989e37888bb353fe 
>   clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java d682bd46ec3826f0a72388cc4ec30e1b1223d0f3 
>   clients/src/test/java/org/apache/kafka/clients/producer/BufferPoolTest.java 4ae43ed47e31ad8052b4348a731da11120968508 
>   clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java 743aa7e523dd476949f484bfa4c7fb8a3afd7bf8 
>   clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java 75513b0bdd439329c5771d87436ef83fda853bfb 
>   clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java 83338633717cfa4ef7cf2a590b5aa6b9c8cb1dd2 
>   core/src/test/scala/integration/kafka/api/ConsumerTest.scala 2802a399bf599e9530f53b7df72f12702a10d3c4 
>   core/src/test/scala/integration/kafka/api/ProducerSendTest.scala b15237b76def3b234924280fa3fdb25dbb0cc0dc 
>   core/src/test/scala/unit/kafka/utils/TestUtils.scala 21d0ed2cb7c9459261d3cdc7c21dece5e2079698 
> 
> Diff: https://reviews.apache.org/r/30763/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Jay Kreps
> 
>


Re: Review Request 30763: Second attempt at flush()

Posted by Jay Kreps <bo...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30763/
-----------------------------------------------------------

(Updated Feb. 23, 2015, 12:26 a.m.)


Review request for kafka.


Summary (updated)
-----------------

Second attempt at flush()


Bugs: KAFKA-1865
    https://issues.apache.org/jira/browse/KAFKA-1865


Repository: kafka


Description
-------

KAFKA-1865 Add a flush() method to the producer.


Diffs (updated)
-----

  clients/src/main/java/org/apache/kafka/clients/Metadata.java e8afecda956303a6ee116499fd443a54c018e17d 
  clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 1fd6917c8a5131254c740abad7f7228a47e3628c 
  clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 84530f2b948f9abd74203db48707e490dd9c81a5 
  clients/src/main/java/org/apache/kafka/clients/producer/Producer.java 17fe541588d462c68c33f6209717cc4015e9b62f 
  clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java 4990692efa6f01c62e1d7b05fbf31bec50e398c9 
  clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java 4a2da41f47994f778109e3c4107ffd90195f0bae 
  clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java ecfe2144d778a5d9b614df5278b9f0a15637f10b 
  clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java dd0af8aee98abed5d4a0dc50989e37888bb353fe 
  clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java PRE-CREATION 
  clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java d682bd46ec3826f0a72388cc4ec30e1b1223d0f3 
  clients/src/test/java/org/apache/kafka/clients/producer/BufferPoolTest.java 4ae43ed47e31ad8052b4348a731da11120968508 
  clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java 743aa7e523dd476949f484bfa4c7fb8a3afd7bf8 
  clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java 75513b0bdd439329c5771d87436ef83fda853bfb 
  clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java 83338633717cfa4ef7cf2a590b5aa6b9c8cb1dd2 
  core/src/test/scala/integration/kafka/api/ConsumerTest.scala 2802a399bf599e9530f53b7df72f12702a10d3c4 
  core/src/test/scala/integration/kafka/api/ProducerSendTest.scala b15237b76def3b234924280fa3fdb25dbb0cc0dc 
  core/src/test/scala/unit/kafka/utils/TestUtils.scala 21d0ed2cb7c9459261d3cdc7c21dece5e2079698 

Diff: https://reviews.apache.org/r/30763/diff/


Testing
-------


Thanks,

Jay Kreps


Re: Review Request 30763: Patch for KAFKA-1865

Posted by Jay Kreps <bo...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30763/
-----------------------------------------------------------

(Updated Feb. 21, 2015, 11:37 p.m.)


Review request for kafka.


Bugs: KAFKA-1865
    https://issues.apache.org/jira/browse/KAFKA-1865


Repository: kafka


Description
-------

KAFKA-1865 Add a flush() method to the producer.


Diffs (updated)
-----

  clients/src/main/java/org/apache/kafka/clients/Metadata.java b8cdd145bfcc6633763b25fc9812c49627c8df92 
  clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 1fd6917c8a5131254c740abad7f7228a47e3628c 
  clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 84530f2b948f9abd74203db48707e490dd9c81a5 
  clients/src/main/java/org/apache/kafka/clients/producer/Producer.java 17fe541588d462c68c33f6209717cc4015e9b62f 
  clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java 4a2da41f47994f778109e3c4107ffd90195f0bae 
  clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java ecfe2144d778a5d9b614df5278b9f0a15637f10b 
  clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java dd0af8aee98abed5d4a0dc50989e37888bb353fe 
  clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java PRE-CREATION 
  clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java d682bd46ec3826f0a72388cc4ec30e1b1223d0f3 
  clients/src/test/java/org/apache/kafka/clients/producer/BufferPoolTest.java 4ae43ed47e31ad8052b4348a731da11120968508 
  clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java 743aa7e523dd476949f484bfa4c7fb8a3afd7bf8 
  clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java 75513b0bdd439329c5771d87436ef83fda853bfb 
  clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java 83338633717cfa4ef7cf2a590b5aa6b9c8cb1dd2 
  core/src/test/scala/integration/kafka/api/ProducerSendTest.scala b15237b76def3b234924280fa3fdb25dbb0cc0dc 
  core/src/test/scala/unit/kafka/utils/TestUtils.scala 21d0ed2cb7c9459261d3cdc7c21dece5e2079698 

Diff: https://reviews.apache.org/r/30763/diff/


Testing
-------


Thanks,

Jay Kreps


Re: Review Request 30763: Patch for KAFKA-1865

Posted by Jay Kreps <bo...@gmail.com>.

> On Feb. 17, 2015, 11:46 p.m., Guozhang Wang wrote:
> > core/src/test/scala/unit/kafka/utils/TestUtils.scala, line 400
> > <https://reviews.apache.org/r/30763/diff/1/?file=853742#file853742line400>
> >
> >     Indentation?

Looks right to me...


> On Feb. 17, 2015, 11:46 p.m., Guozhang Wang wrote:
> > clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java, line 409
> > <https://reviews.apache.org/r/30763/diff/1/?file=853734#file853734line409>
> >
> >     Add @Throws KafkaException; in fact, flush interruption should never happen as ProduceRequestResult does not have interrupt APIs right?

I think any thread that interrupts the app thread while it waits on flush would get this. Documented it.


- Jay


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30763/#review72821
-----------------------------------------------------------


On Feb. 7, 2015, 8:59 p.m., Jay Kreps wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/30763/
> -----------------------------------------------------------
> 
> (Updated Feb. 7, 2015, 8:59 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1865
>     https://issues.apache.org/jira/browse/KAFKA-1865
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> KAFKA-1865 Add a flush() method to the producer.
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 1fd6917c8a5131254c740abad7f7228a47e3628c 
>   clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 84530f2b948f9abd74203db48707e490dd9c81a5 
>   clients/src/main/java/org/apache/kafka/clients/producer/Producer.java 17fe541588d462c68c33f6209717cc4015e9b62f 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java ecfe2144d778a5d9b614df5278b9f0a15637f10b 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java dd0af8aee98abed5d4a0dc50989e37888bb353fe 
>   clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java 75513b0bdd439329c5771d87436ef83fda853bfb 
>   clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java 83338633717cfa4ef7cf2a590b5aa6b9c8cb1dd2 
>   core/src/test/scala/integration/kafka/api/ProducerSendTest.scala b15237b76def3b234924280fa3fdb25dbb0cc0dc 
>   core/src/test/scala/unit/kafka/utils/TestUtils.scala 54755e8dd3f23ced313067566cd4ea867f8a496e 
> 
> Diff: https://reviews.apache.org/r/30763/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Jay Kreps
> 
>


Re: Review Request 30763: Patch for KAFKA-1865

Posted by Guozhang Wang <wa...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30763/#review72821
-----------------------------------------------------------



clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
<https://reviews.apache.org/r/30763/#comment118910>

    Add @Throws KafkaException; in fact, flush interruption should never happen as ProduceRequestResult does not have interrupt APIs right?



core/src/test/scala/unit/kafka/utils/TestUtils.scala
<https://reviews.apache.org/r/30763/#comment118912>

    Indentation?


- Guozhang Wang


On Feb. 7, 2015, 8:59 p.m., Jay Kreps wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/30763/
> -----------------------------------------------------------
> 
> (Updated Feb. 7, 2015, 8:59 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1865
>     https://issues.apache.org/jira/browse/KAFKA-1865
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> KAFKA-1865 Add a flush() method to the producer.
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 1fd6917c8a5131254c740abad7f7228a47e3628c 
>   clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 84530f2b948f9abd74203db48707e490dd9c81a5 
>   clients/src/main/java/org/apache/kafka/clients/producer/Producer.java 17fe541588d462c68c33f6209717cc4015e9b62f 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java ecfe2144d778a5d9b614df5278b9f0a15637f10b 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java dd0af8aee98abed5d4a0dc50989e37888bb353fe 
>   clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java 75513b0bdd439329c5771d87436ef83fda853bfb 
>   clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java 83338633717cfa4ef7cf2a590b5aa6b9c8cb1dd2 
>   core/src/test/scala/integration/kafka/api/ProducerSendTest.scala b15237b76def3b234924280fa3fdb25dbb0cc0dc 
>   core/src/test/scala/unit/kafka/utils/TestUtils.scala 54755e8dd3f23ced313067566cd4ea867f8a496e 
> 
> Diff: https://reviews.apache.org/r/30763/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Jay Kreps
> 
>