You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Guozhang Wang <gu...@linkedin.com> on 2014/07/01 00:47:53 UTC

Re: Review Request 22874: Fix KAFKA-1498

-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/22874/
-----------------------------------------------------------

(Updated June 30, 2014, 10:47 p.m.)


Review request for kafka.


Bugs: KAFKA-1498
    https://issues.apache.org/jira/browse/KAFKA-1498


Repository: kafka


Description
-------

1. Use a size limit on the memory records to guard too-large message cases; 2. Caller thread check partition readiness due to batch size upon append, and only wake up sender when the appended partition is ready; 3. Sender thread select time based on the partition readiness timeout and metadata timeout. 4. Mirror maker to use one blocking queue per producer thread. 5. The select time in NetworkClient is minimum of the partition readiness (if there is no data at all will be Long.MAX_VALUE) and metadata expiry timeout, hence upper bounded by the metadata age; On the other hand when the appended batch is full or a new batch is created, the producer thread will wake up the network thread. 6. Also fixed a transient test bug in SocketServerTest and speed-up the testNoResponse case of ProducerFailureHandling. 7. Other minor fixes.


Diffs (updated)
-----

  clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 522881c972ca42ff4dfb6237a2db15b625334d7e 
  clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 00775abbcac850b0f2bb9a70b6fbc7cdf319bcf6 
  clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java 57bc285c20b5af8957bcc5322cd75c021a5af215 
  clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java 1ed3c28b436d28381d9402896e32d16f2586c65e 
  clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 6fb5b82dedb48d946d1ac1ec7a535bddfdc693fa 
  clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java 759f577eaf0e7d28a84926d4aa30f4ef0cb27bc2 
  clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java 6a3cdcc1f2542479f37bc339baca87464c01e84e 
  clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java 8b4ac0f9a59b4f2e67e48e6d9b0d9fe340f77166 
  clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java 93b58d02eac0f8ca28440e3e0ebea28ed3a7673c 
  clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java 5489acac6806b3ae5e6d568d401d5a20c86cac05 
  core/src/main/scala/kafka/tools/MirrorMaker.scala 763839157d9736f15110072bcae93fc7fdc33f55 
  core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala d1464447c65231abeaa52e94d5a48cf62f054881 
  core/src/test/scala/unit/kafka/network/SocketServerTest.scala 62fb02cf02d3876b9804d756c4bf8514554cc836 

Diff: https://reviews.apache.org/r/22874/diff/


Testing
-------


Thanks,

Guozhang Wang


Re: Review Request 22874: Fix KAFKA-1498

Posted by Jay Kreps <bo...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/22874/#review47100
-----------------------------------------------------------

Ship it!


- Jay Kreps


On June 30, 2014, 10:47 p.m., Guozhang Wang wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/22874/
> -----------------------------------------------------------
> 
> (Updated June 30, 2014, 10:47 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1498
>     https://issues.apache.org/jira/browse/KAFKA-1498
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> 1. Use a size limit on the memory records to guard too-large message cases; 2. Caller thread check partition readiness due to batch size upon append, and only wake up sender when the appended partition is ready; 3. Sender thread select time based on the partition readiness timeout and metadata timeout. 4. Mirror maker to use one blocking queue per producer thread. 5. The select time in NetworkClient is minimum of the partition readiness (if there is no data at all will be Long.MAX_VALUE) and metadata expiry timeout, hence upper bounded by the metadata age; On the other hand when the appended batch is full or a new batch is created, the producer thread will wake up the network thread. 6. Also fixed a transient test bug in SocketServerTest and speed-up the testNoResponse case of ProducerFailureHandling. 7. Other minor fixes.
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 522881c972ca42ff4dfb6237a2db15b625334d7e 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 00775abbcac850b0f2bb9a70b6fbc7cdf319bcf6 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java 57bc285c20b5af8957bcc5322cd75c021a5af215 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java 1ed3c28b436d28381d9402896e32d16f2586c65e 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 6fb5b82dedb48d946d1ac1ec7a535bddfdc693fa 
>   clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java 759f577eaf0e7d28a84926d4aa30f4ef0cb27bc2 
>   clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java 6a3cdcc1f2542479f37bc339baca87464c01e84e 
>   clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java 8b4ac0f9a59b4f2e67e48e6d9b0d9fe340f77166 
>   clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java 93b58d02eac0f8ca28440e3e0ebea28ed3a7673c 
>   clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java 5489acac6806b3ae5e6d568d401d5a20c86cac05 
>   core/src/main/scala/kafka/tools/MirrorMaker.scala 763839157d9736f15110072bcae93fc7fdc33f55 
>   core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala d1464447c65231abeaa52e94d5a48cf62f054881 
>   core/src/test/scala/unit/kafka/network/SocketServerTest.scala 62fb02cf02d3876b9804d756c4bf8514554cc836 
> 
> Diff: https://reviews.apache.org/r/22874/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Guozhang Wang
> 
>


Re: Review Request 22874: Fix KAFKA-1498

Posted by Guozhang Wang <gu...@linkedin.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/22874/
-----------------------------------------------------------

(Updated July 1, 2014, 6:12 p.m.)


Review request for kafka.


Bugs: KAFKA-1498
    https://issues.apache.org/jira/browse/KAFKA-1498


Repository: kafka


Description
-------

1. Use a size limit on the memory records to guard too-large message cases; 2. Caller thread check partition readiness due to batch size upon append, and only wake up sender when the appended partition is ready; 3. Sender thread select time based on the partition readiness timeout and metadata timeout. 4. Mirror maker to use one blocking queue per producer thread. 5. The select time in NetworkClient is minimum of the partition readiness (if there is no data at all will be Long.MAX_VALUE) and metadata expiry timeout, hence upper bounded by the metadata age; On the other hand when the appended batch is full or a new batch is created, the producer thread will wake up the network thread. 6. Also fixed a transient test bug in SocketServerTest and speed-up the testNoResponse case of ProducerFailureHandling. 7. Other minor fixes.


Diffs (updated)
-----

  clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 522881c972ca42ff4dfb6237a2db15b625334d7e 
  clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 00775abbcac850b0f2bb9a70b6fbc7cdf319bcf6 
  clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java 57bc285c20b5af8957bcc5322cd75c021a5af215 
  clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java 1ed3c28b436d28381d9402896e32d16f2586c65e 
  clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 6fb5b82dedb48d946d1ac1ec7a535bddfdc693fa 
  clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java 759f577eaf0e7d28a84926d4aa30f4ef0cb27bc2 
  clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java 6a3cdcc1f2542479f37bc339baca87464c01e84e 
  clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java 8b4ac0f9a59b4f2e67e48e6d9b0d9fe340f77166 
  clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java 93b58d02eac0f8ca28440e3e0ebea28ed3a7673c 
  clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java 5489acac6806b3ae5e6d568d401d5a20c86cac05 
  core/src/main/scala/kafka/tools/MirrorMaker.scala 763839157d9736f15110072bcae93fc7fdc33f55 
  core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala d1464447c65231abeaa52e94d5a48cf62f054881 
  core/src/test/scala/unit/kafka/network/SocketServerTest.scala 62fb02cf02d3876b9804d756c4bf8514554cc836 

Diff: https://reviews.apache.org/r/22874/diff/


Testing
-------


Thanks,

Guozhang Wang


Re: Review Request 22874: Fix KAFKA-1498

Posted by Jun Rao <ju...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/22874/#review47075
-----------------------------------------------------------


Could we add a record-size-avg metric in Sender?


clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
<https://reviews.apache.org/r/22874/#comment82677>

    Is the comment still valid? I don't see the default select timeout.



clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
<https://reviews.apache.org/r/22874/#comment82687>

    Is the comment still valid? Don't see the default select time value.


- Jun Rao


On June 30, 2014, 10:47 p.m., Guozhang Wang wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/22874/
> -----------------------------------------------------------
> 
> (Updated June 30, 2014, 10:47 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1498
>     https://issues.apache.org/jira/browse/KAFKA-1498
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> 1. Use a size limit on the memory records to guard too-large message cases; 2. Caller thread check partition readiness due to batch size upon append, and only wake up sender when the appended partition is ready; 3. Sender thread select time based on the partition readiness timeout and metadata timeout. 4. Mirror maker to use one blocking queue per producer thread. 5. The select time in NetworkClient is minimum of the partition readiness (if there is no data at all will be Long.MAX_VALUE) and metadata expiry timeout, hence upper bounded by the metadata age; On the other hand when the appended batch is full or a new batch is created, the producer thread will wake up the network thread. 6. Also fixed a transient test bug in SocketServerTest and speed-up the testNoResponse case of ProducerFailureHandling. 7. Other minor fixes.
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 522881c972ca42ff4dfb6237a2db15b625334d7e 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 00775abbcac850b0f2bb9a70b6fbc7cdf319bcf6 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java 57bc285c20b5af8957bcc5322cd75c021a5af215 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java 1ed3c28b436d28381d9402896e32d16f2586c65e 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 6fb5b82dedb48d946d1ac1ec7a535bddfdc693fa 
>   clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java 759f577eaf0e7d28a84926d4aa30f4ef0cb27bc2 
>   clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java 6a3cdcc1f2542479f37bc339baca87464c01e84e 
>   clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java 8b4ac0f9a59b4f2e67e48e6d9b0d9fe340f77166 
>   clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java 93b58d02eac0f8ca28440e3e0ebea28ed3a7673c 
>   clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java 5489acac6806b3ae5e6d568d401d5a20c86cac05 
>   core/src/main/scala/kafka/tools/MirrorMaker.scala 763839157d9736f15110072bcae93fc7fdc33f55 
>   core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala d1464447c65231abeaa52e94d5a48cf62f054881 
>   core/src/test/scala/unit/kafka/network/SocketServerTest.scala 62fb02cf02d3876b9804d756c4bf8514554cc836 
> 
> Diff: https://reviews.apache.org/r/22874/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Guozhang Wang
> 
>