You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Manikumar Reddy O <ma...@gmail.com> on 2014/08/03 19:40:06 UTC

Review Request 24214: Patch for KAFKA-1374

-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24214/
-----------------------------------------------------------

Review request for kafka.


Bugs: KAFKA-1374
    https://issues.apache.org/jira/browse/KAFKA-1374


Repository: kafka


Description
-------

LogCleaner code decompresses the compressed messages and writes back the retained/compacted messages in compressed form


Diffs
-----

  core/src/main/scala/kafka/log/LogCleaner.scala afbeffc72e7d7706b44961aecf8519c5c5a3b4b1 
  core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala 5bfa764638e92f217d0ff7108ec8f53193c22978 

Diff: https://reviews.apache.org/r/24214/diff/


Testing
-------


Thanks,

Manikumar Reddy O


Re: Review Request 24214: Patch for KAFKA-1374

Posted by Manikumar Reddy O <ma...@gmail.com>.

> On May 12, 2015, 2:01 p.m., Joel Koshy wrote:
> > core/src/main/scala/kafka/log/LogCleaner.scala, line 409
> > <https://reviews.apache.org/r/24214/diff/9/?file=824405#file824405line409>
> >
> >     I would suggest one of two options over this (i.e., instead of two helper methods)
> >     - Inline both here and get rid of those
> >     - Have a single private helper (e.g., collectRetainedMessages)

removed the  helper methods


> On May 12, 2015, 2:01 p.m., Joel Koshy wrote:
> > core/src/main/scala/kafka/log/LogCleaner.scala, line 479
> > <https://reviews.apache.org/r/24214/diff/9/?file=824405#file824405line479>
> >
> >     We should now compress with the compression codec of the topic (KAFKA-1499)

will do as separate JIRA


> On May 12, 2015, 2:01 p.m., Joel Koshy wrote:
> > core/src/main/scala/kafka/log/LogCleaner.scala, line 498
> > <https://reviews.apache.org/r/24214/diff/9/?file=824405#file824405line498>
> >
> >     We should instead do a trivial refactor in ByteBufferMessageSet to compress messages in a preallocated buffer. It would be preferable to avoid having this compression logic in different places.

moved the compresssMessages() method to ByteBufferMessageSet class. Pl let me know your thoughts..


- Manikumar Reddy


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24214/#review83392
-----------------------------------------------------------


On May 18, 2015, 5:29 p.m., Manikumar Reddy O wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/24214/
> -----------------------------------------------------------
> 
> (Updated May 18, 2015, 5:29 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1374
>     https://issues.apache.org/jira/browse/KAFKA-1374
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Addressing Joel's comments
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/log/LogCleaner.scala abea8b251895a5cc0788c6e25b112a2935a3f631 
>   core/src/main/scala/kafka/message/ByteBufferMessageSet.scala 9dfe914991aaf82162e5e300c587c794555d5fd0 
>   core/src/main/scala/kafka/message/MessageSet.scala 28b56e68cfdbbf107dd7cbd248ffa8fa6bbcd13f 
>   core/src/test/scala/kafka/tools/TestLogCleaning.scala 844589427cb9337acd89a5239a98b811ee58118e 
>   core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala 3b5aa9dc3b7ac5893c1d281ae1326be0e9ed8aad 
>   core/src/test/scala/unit/kafka/log/LogTest.scala 76d3bfd378f32fd2b216b3ebdec86e2070491924 
> 
> Diff: https://reviews.apache.org/r/24214/diff/
> 
> 
> Testing
> -------
> 
> /*TestLogCleaning stress test output for compressed messages/
> 
> Producing 100000 messages...
> Logging produce requests to /tmp/kafka-log-cleaner-produced-6014466306002699464.txt
> Sleeping for 120 seconds...
> Consuming messages...
> Logging consumed messages to /tmp/kafka-log-cleaner-consumed-177538909590644701.txt
> 100000 rows of data produced, 13165 rows of data consumed (86.8% reduction).
> De-duplicating and validating output files...
> Validated 9005 values, 0 mismatches.
> 
> Producing 1000000 messages...
> Logging produce requests to /tmp/kafka-log-cleaner-produced-3298578695475992991.txt
> Sleeping for 120 seconds...
> Consuming messages...
> Logging consumed messages to /tmp/kafka-log-cleaner-consumed-7192293977610206930.txt
> 1000000 rows of data produced, 119926 rows of data consumed (88.0% reduction).
> De-duplicating and validating output files...
> Validated 89947 values, 0 mismatches.
> 
> Producing 10000000 messages...
> Logging produce requests to /tmp/kafka-log-cleaner-produced-3336255463347572934.txt
> Sleeping for 120 seconds...
> Consuming messages...
> Logging consumed messages to /tmp/kafka-log-cleaner-consumed-9149188270705707725.txt
> 10000000 rows of data produced, 1645281 rows of data consumed (83.5% reduction).
> De-duplicating and validating output files...
> Validated 899853 values, 0 mismatches.
> 
> 
> /*TestLogCleaning stress test output for non-compressed messages*/
> 
> Producing 100000 messages...
> Logging produce requests to /tmp/kafka-log-cleaner-produced-5174543709786189363.txt
> Sleeping for 120 seconds...
> Consuming messages...
> Logging consumed messages to /tmp/kafka-log-cleaner-consumed-5143455017777144701.txt
> 100000 rows of data produced, 22775 rows of data consumed (77.2% reduction).
> De-duplicating and validating output files...
> Validated 17874 values, 0 mismatches.
> 
> Producing 1000000 messages...
> Logging produce requests to /tmp/kafka-log-cleaner-produced-7814446915546169271.txt
> Sleeping for 120 seconds...
> Consuming messages...
> Logging consumed messages to /tmp/kafka-log-cleaner-consumed-5172557663160447626.txt
> 1000000 rows of data produced, 129230 rows of data consumed (87.1% reduction).
> De-duplicating and validating output files...
> Validated 89947 values, 0 mismatches.
> 
> Producing 10000000 messages...
> Logging produce requests to /tmp/kafka-log-cleaner-produced-6092986571905399164.txt
> Sleeping for 120 seconds...
> Consuming messages...
> Logging consumed messages to /tmp/kafka-log-cleaner-consumed-63626021421841220.txt
> 10000000 rows of data produced, 1136608 rows of data consumed (88.6% reduction).
> De-duplicating and validating output files...
> Validated 899853 values, 0 mismatches.
> 
> 
> Thanks,
> 
> Manikumar Reddy O
> 
>


Re: Review Request 24214: Patch for KAFKA-1374

Posted by Joel Koshy <jj...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24214/#review83392
-----------------------------------------------------------


Sorry for the delay. Overall, this looks good.

As discussed earlier, this patch needs a minor rebase.

There are a couple of points to note:
- In KAFKA-1499 you added broker-side compression. When writing out the compacted messages, we should compress using the configured compression codec. We can do this as an incremental change if you prefer. i.e., your current patch makes the log cleaner compression-aware. A subsequent patch can handle writing out to the configured codec. That part could be non-trivial as we would then probably want to do some batching when writing out compacted compressed messages.
- In KAFKA-1755 I had added some defensive code to prevent compressed messages and unkeyed messages from getting in. The compression-related code will need to be removed. Again, let me know if you need any help with this.

Let me know if you need help with any of this.


core/src/main/scala/kafka/log/LogCleaner.scala
<https://reviews.apache.org/r/24214/#comment134376>

    I would suggest one of two options over this (i.e., instead of two helper methods)
    - Inline both here and get rid of those
    - Have a single private helper (e.g., collectRetainedMessages)



core/src/main/scala/kafka/log/LogCleaner.scala
<https://reviews.apache.org/r/24214/#comment134377>

    We should now compress with the compression codec of the topic (KAFKA-1499)



core/src/main/scala/kafka/log/LogCleaner.scala
<https://reviews.apache.org/r/24214/#comment134378>

    We should instead do a trivial refactor in ByteBufferMessageSet to compress messages in a preallocated buffer. It would be preferable to avoid having this compression logic in different places.


- Joel Koshy


On Jan. 17, 2015, 6:53 p.m., Manikumar Reddy O wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/24214/
> -----------------------------------------------------------
> 
> (Updated Jan. 17, 2015, 6:53 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1374
>     https://issues.apache.org/jira/browse/KAFKA-1374
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Updating the rebased code
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/log/LogCleaner.scala f8e7cd5fabce78c248a9027c4bb374a792508675 
>   core/src/main/scala/kafka/tools/TestLogCleaning.scala af496f7c547a5ac7a4096a6af325dad0d8feec6f 
>   core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala 07acd460b1259e0a3f4069b8b8dcd8123ef5810e 
> 
> Diff: https://reviews.apache.org/r/24214/diff/
> 
> 
> Testing
> -------
> 
> /*TestLogCleaning stress test output for compressed messages/
> 
> Producing 100000 messages...
> Logging produce requests to /tmp/kafka-log-cleaner-produced-6014466306002699464.txt
> Sleeping for 120 seconds...
> Consuming messages...
> Logging consumed messages to /tmp/kafka-log-cleaner-consumed-177538909590644701.txt
> 100000 rows of data produced, 13165 rows of data consumed (86.8% reduction).
> De-duplicating and validating output files...
> Validated 9005 values, 0 mismatches.
> 
> Producing 1000000 messages...
> Logging produce requests to /tmp/kafka-log-cleaner-produced-3298578695475992991.txt
> Sleeping for 120 seconds...
> Consuming messages...
> Logging consumed messages to /tmp/kafka-log-cleaner-consumed-7192293977610206930.txt
> 1000000 rows of data produced, 119926 rows of data consumed (88.0% reduction).
> De-duplicating and validating output files...
> Validated 89947 values, 0 mismatches.
> 
> Producing 10000000 messages...
> Logging produce requests to /tmp/kafka-log-cleaner-produced-3336255463347572934.txt
> Sleeping for 120 seconds...
> Consuming messages...
> Logging consumed messages to /tmp/kafka-log-cleaner-consumed-9149188270705707725.txt
> 10000000 rows of data produced, 1645281 rows of data consumed (83.5% reduction).
> De-duplicating and validating output files...
> Validated 899853 values, 0 mismatches.
> 
> 
> /*TestLogCleaning stress test output for non-compressed messages*/
> 
> Producing 100000 messages...
> Logging produce requests to /tmp/kafka-log-cleaner-produced-5174543709786189363.txt
> Sleeping for 120 seconds...
> Consuming messages...
> Logging consumed messages to /tmp/kafka-log-cleaner-consumed-5143455017777144701.txt
> 100000 rows of data produced, 22775 rows of data consumed (77.2% reduction).
> De-duplicating and validating output files...
> Validated 17874 values, 0 mismatches.
> 
> Producing 1000000 messages...
> Logging produce requests to /tmp/kafka-log-cleaner-produced-7814446915546169271.txt
> Sleeping for 120 seconds...
> Consuming messages...
> Logging consumed messages to /tmp/kafka-log-cleaner-consumed-5172557663160447626.txt
> 1000000 rows of data produced, 129230 rows of data consumed (87.1% reduction).
> De-duplicating and validating output files...
> Validated 89947 values, 0 mismatches.
> 
> Producing 10000000 messages...
> Logging produce requests to /tmp/kafka-log-cleaner-produced-6092986571905399164.txt
> Sleeping for 120 seconds...
> Consuming messages...
> Logging consumed messages to /tmp/kafka-log-cleaner-consumed-63626021421841220.txt
> 10000000 rows of data produced, 1136608 rows of data consumed (88.6% reduction).
> De-duplicating and validating output files...
> Validated 899853 values, 0 mismatches.
> 
> 
> Thanks,
> 
> Manikumar Reddy O
> 
>


Re: Review Request 24214: Patch for KAFKA-1374

Posted by Eric Olander <ol...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24214/#review68569
-----------------------------------------------------------



core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
<https://reviews.apache.org/r/24214/#comment112888>

    Could be simplified to just:
    for (codec <- CompressionType.values) yield Array(codec.name)


- Eric Olander


On Jan. 17, 2015, 6:53 p.m., Manikumar Reddy O wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/24214/
> -----------------------------------------------------------
> 
> (Updated Jan. 17, 2015, 6:53 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1374
>     https://issues.apache.org/jira/browse/KAFKA-1374
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Updating the rebased code
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/log/LogCleaner.scala f8e7cd5fabce78c248a9027c4bb374a792508675 
>   core/src/main/scala/kafka/tools/TestLogCleaning.scala af496f7c547a5ac7a4096a6af325dad0d8feec6f 
>   core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala 07acd460b1259e0a3f4069b8b8dcd8123ef5810e 
> 
> Diff: https://reviews.apache.org/r/24214/diff/
> 
> 
> Testing
> -------
> 
> /*TestLogCleaning stress test output for compressed messages/
> 
> Producing 100000 messages...
> Logging produce requests to /tmp/kafka-log-cleaner-produced-6014466306002699464.txt
> Sleeping for 120 seconds...
> Consuming messages...
> Logging consumed messages to /tmp/kafka-log-cleaner-consumed-177538909590644701.txt
> 100000 rows of data produced, 13165 rows of data consumed (86.8% reduction).
> De-duplicating and validating output files...
> Validated 9005 values, 0 mismatches.
> 
> Producing 1000000 messages...
> Logging produce requests to /tmp/kafka-log-cleaner-produced-3298578695475992991.txt
> Sleeping for 120 seconds...
> Consuming messages...
> Logging consumed messages to /tmp/kafka-log-cleaner-consumed-7192293977610206930.txt
> 1000000 rows of data produced, 119926 rows of data consumed (88.0% reduction).
> De-duplicating and validating output files...
> Validated 89947 values, 0 mismatches.
> 
> Producing 10000000 messages...
> Logging produce requests to /tmp/kafka-log-cleaner-produced-3336255463347572934.txt
> Sleeping for 120 seconds...
> Consuming messages...
> Logging consumed messages to /tmp/kafka-log-cleaner-consumed-9149188270705707725.txt
> 10000000 rows of data produced, 1645281 rows of data consumed (83.5% reduction).
> De-duplicating and validating output files...
> Validated 899853 values, 0 mismatches.
> 
> 
> /*TestLogCleaning stress test output for non-compressed messages*/
> 
> Producing 100000 messages...
> Logging produce requests to /tmp/kafka-log-cleaner-produced-5174543709786189363.txt
> Sleeping for 120 seconds...
> Consuming messages...
> Logging consumed messages to /tmp/kafka-log-cleaner-consumed-5143455017777144701.txt
> 100000 rows of data produced, 22775 rows of data consumed (77.2% reduction).
> De-duplicating and validating output files...
> Validated 17874 values, 0 mismatches.
> 
> Producing 1000000 messages...
> Logging produce requests to /tmp/kafka-log-cleaner-produced-7814446915546169271.txt
> Sleeping for 120 seconds...
> Consuming messages...
> Logging consumed messages to /tmp/kafka-log-cleaner-consumed-5172557663160447626.txt
> 1000000 rows of data produced, 129230 rows of data consumed (87.1% reduction).
> De-duplicating and validating output files...
> Validated 89947 values, 0 mismatches.
> 
> Producing 10000000 messages...
> Logging produce requests to /tmp/kafka-log-cleaner-produced-6092986571905399164.txt
> Sleeping for 120 seconds...
> Consuming messages...
> Logging consumed messages to /tmp/kafka-log-cleaner-consumed-63626021421841220.txt
> 10000000 rows of data produced, 1136608 rows of data consumed (88.6% reduction).
> De-duplicating and validating output files...
> Validated 899853 values, 0 mismatches.
> 
> 
> Thanks,
> 
> Manikumar Reddy O
> 
>


Re: Review Request 24214: Patch for KAFKA-1374

Posted by Joel Koshy <jj...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24214/#review84278
-----------------------------------------------------------


Thanks for the updated patch. This looks good. I ended up rebasing while you were working on this :) I have a few additional edits which I noted below which I will upload shortly.


core/src/main/scala/kafka/log/LogCleaner.scala
<https://reviews.apache.org/r/24214/#comment135488>

    Minor improvement: we can avoid an extra copy by filtering the iterator above, and then materializing once.



core/src/main/scala/kafka/log/LogCleaner.scala
<https://reviews.apache.org/r/24214/#comment135497>

    I'm wondering if it would be helpful to split stats into compressed vs noncompressed.
    
    E.g., x bytes read (from y compressed bytes); n messages read (from m compressed messages) and so on...



core/src/main/scala/kafka/log/LogCleaner.scala
<https://reviews.apache.org/r/24214/#comment135489>

    The last statement can be !redundant && !obsoleteDelete



core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
<https://reviews.apache.org/r/24214/#comment135490>

    I actually had a different thought - i.e., to avoid duplicating the compression code in BBMS. Then I ran into the issue that you probably saw - i.e., the BBMS create method isn't very amenable to refactor with pre-assigned offsets. So I think what you originally had was actually better.
    
    Ideally we should have a compress (raw bytes) method and just use that in both places. In fact, we can consider using the Compressor in clients - which will have the added benefit of identical compression in use in both the broker and clients. E.g., right now it is possible to be under the message size limit on the client and still exceed it on the broker.



core/src/main/scala/kafka/message/MessageSet.scala
<https://reviews.apache.org/r/24214/#comment135491>

    Can do without this addition.



core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
<https://reviews.apache.org/r/24214/#comment135494>

    Minor improvement here to avoid the extra hashmap.



core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
<https://reviews.apache.org/r/24214/#comment135495>

    Can use Stream.cons for convenience.



core/src/test/scala/unit/kafka/log/LogTest.scala
<https://reviews.apache.org/r/24214/#comment135496>

    Few more minor edits - to test appending keyed compressed messages.


- Joel Koshy


On May 18, 2015, 5:29 p.m., Manikumar Reddy O wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/24214/
> -----------------------------------------------------------
> 
> (Updated May 18, 2015, 5:29 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1374
>     https://issues.apache.org/jira/browse/KAFKA-1374
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Addressing Joel's comments
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/log/LogCleaner.scala abea8b251895a5cc0788c6e25b112a2935a3f631 
>   core/src/main/scala/kafka/message/ByteBufferMessageSet.scala 9dfe914991aaf82162e5e300c587c794555d5fd0 
>   core/src/main/scala/kafka/message/MessageSet.scala 28b56e68cfdbbf107dd7cbd248ffa8fa6bbcd13f 
>   core/src/test/scala/kafka/tools/TestLogCleaning.scala 844589427cb9337acd89a5239a98b811ee58118e 
>   core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala 3b5aa9dc3b7ac5893c1d281ae1326be0e9ed8aad 
>   core/src/test/scala/unit/kafka/log/LogTest.scala 76d3bfd378f32fd2b216b3ebdec86e2070491924 
> 
> Diff: https://reviews.apache.org/r/24214/diff/
> 
> 
> Testing
> -------
> 
> /*TestLogCleaning stress test output for compressed messages/
> 
> Producing 100000 messages...
> Logging produce requests to /tmp/kafka-log-cleaner-produced-6014466306002699464.txt
> Sleeping for 120 seconds...
> Consuming messages...
> Logging consumed messages to /tmp/kafka-log-cleaner-consumed-177538909590644701.txt
> 100000 rows of data produced, 13165 rows of data consumed (86.8% reduction).
> De-duplicating and validating output files...
> Validated 9005 values, 0 mismatches.
> 
> Producing 1000000 messages...
> Logging produce requests to /tmp/kafka-log-cleaner-produced-3298578695475992991.txt
> Sleeping for 120 seconds...
> Consuming messages...
> Logging consumed messages to /tmp/kafka-log-cleaner-consumed-7192293977610206930.txt
> 1000000 rows of data produced, 119926 rows of data consumed (88.0% reduction).
> De-duplicating and validating output files...
> Validated 89947 values, 0 mismatches.
> 
> Producing 10000000 messages...
> Logging produce requests to /tmp/kafka-log-cleaner-produced-3336255463347572934.txt
> Sleeping for 120 seconds...
> Consuming messages...
> Logging consumed messages to /tmp/kafka-log-cleaner-consumed-9149188270705707725.txt
> 10000000 rows of data produced, 1645281 rows of data consumed (83.5% reduction).
> De-duplicating and validating output files...
> Validated 899853 values, 0 mismatches.
> 
> 
> /*TestLogCleaning stress test output for non-compressed messages*/
> 
> Producing 100000 messages...
> Logging produce requests to /tmp/kafka-log-cleaner-produced-5174543709786189363.txt
> Sleeping for 120 seconds...
> Consuming messages...
> Logging consumed messages to /tmp/kafka-log-cleaner-consumed-5143455017777144701.txt
> 100000 rows of data produced, 22775 rows of data consumed (77.2% reduction).
> De-duplicating and validating output files...
> Validated 17874 values, 0 mismatches.
> 
> Producing 1000000 messages...
> Logging produce requests to /tmp/kafka-log-cleaner-produced-7814446915546169271.txt
> Sleeping for 120 seconds...
> Consuming messages...
> Logging consumed messages to /tmp/kafka-log-cleaner-consumed-5172557663160447626.txt
> 1000000 rows of data produced, 129230 rows of data consumed (87.1% reduction).
> De-duplicating and validating output files...
> Validated 89947 values, 0 mismatches.
> 
> Producing 10000000 messages...
> Logging produce requests to /tmp/kafka-log-cleaner-produced-6092986571905399164.txt
> Sleeping for 120 seconds...
> Consuming messages...
> Logging consumed messages to /tmp/kafka-log-cleaner-consumed-63626021421841220.txt
> 10000000 rows of data produced, 1136608 rows of data consumed (88.6% reduction).
> De-duplicating and validating output files...
> Validated 899853 values, 0 mismatches.
> 
> 
> Thanks,
> 
> Manikumar Reddy O
> 
>


Re: Review Request 24214: Patch for KAFKA-1374

Posted by Manikumar Reddy O <ma...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24214/
-----------------------------------------------------------

(Updated May 18, 2015, 5:29 p.m.)


Review request for kafka.


Bugs: KAFKA-1374
    https://issues.apache.org/jira/browse/KAFKA-1374


Repository: kafka


Description (updated)
-------

Addressing Joel's comments


Diffs (updated)
-----

  core/src/main/scala/kafka/log/LogCleaner.scala abea8b251895a5cc0788c6e25b112a2935a3f631 
  core/src/main/scala/kafka/message/ByteBufferMessageSet.scala 9dfe914991aaf82162e5e300c587c794555d5fd0 
  core/src/main/scala/kafka/message/MessageSet.scala 28b56e68cfdbbf107dd7cbd248ffa8fa6bbcd13f 
  core/src/test/scala/kafka/tools/TestLogCleaning.scala 844589427cb9337acd89a5239a98b811ee58118e 
  core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala 3b5aa9dc3b7ac5893c1d281ae1326be0e9ed8aad 
  core/src/test/scala/unit/kafka/log/LogTest.scala 76d3bfd378f32fd2b216b3ebdec86e2070491924 

Diff: https://reviews.apache.org/r/24214/diff/


Testing
-------

/*TestLogCleaning stress test output for compressed messages/

Producing 100000 messages...
Logging produce requests to /tmp/kafka-log-cleaner-produced-6014466306002699464.txt
Sleeping for 120 seconds...
Consuming messages...
Logging consumed messages to /tmp/kafka-log-cleaner-consumed-177538909590644701.txt
100000 rows of data produced, 13165 rows of data consumed (86.8% reduction).
De-duplicating and validating output files...
Validated 9005 values, 0 mismatches.

Producing 1000000 messages...
Logging produce requests to /tmp/kafka-log-cleaner-produced-3298578695475992991.txt
Sleeping for 120 seconds...
Consuming messages...
Logging consumed messages to /tmp/kafka-log-cleaner-consumed-7192293977610206930.txt
1000000 rows of data produced, 119926 rows of data consumed (88.0% reduction).
De-duplicating and validating output files...
Validated 89947 values, 0 mismatches.

Producing 10000000 messages...
Logging produce requests to /tmp/kafka-log-cleaner-produced-3336255463347572934.txt
Sleeping for 120 seconds...
Consuming messages...
Logging consumed messages to /tmp/kafka-log-cleaner-consumed-9149188270705707725.txt
10000000 rows of data produced, 1645281 rows of data consumed (83.5% reduction).
De-duplicating and validating output files...
Validated 899853 values, 0 mismatches.


/*TestLogCleaning stress test output for non-compressed messages*/

Producing 100000 messages...
Logging produce requests to /tmp/kafka-log-cleaner-produced-5174543709786189363.txt
Sleeping for 120 seconds...
Consuming messages...
Logging consumed messages to /tmp/kafka-log-cleaner-consumed-5143455017777144701.txt
100000 rows of data produced, 22775 rows of data consumed (77.2% reduction).
De-duplicating and validating output files...
Validated 17874 values, 0 mismatches.

Producing 1000000 messages...
Logging produce requests to /tmp/kafka-log-cleaner-produced-7814446915546169271.txt
Sleeping for 120 seconds...
Consuming messages...
Logging consumed messages to /tmp/kafka-log-cleaner-consumed-5172557663160447626.txt
1000000 rows of data produced, 129230 rows of data consumed (87.1% reduction).
De-duplicating and validating output files...
Validated 89947 values, 0 mismatches.

Producing 10000000 messages...
Logging produce requests to /tmp/kafka-log-cleaner-produced-6092986571905399164.txt
Sleeping for 120 seconds...
Consuming messages...
Logging consumed messages to /tmp/kafka-log-cleaner-consumed-63626021421841220.txt
10000000 rows of data produced, 1136608 rows of data consumed (88.6% reduction).
De-duplicating and validating output files...
Validated 899853 values, 0 mismatches.


Thanks,

Manikumar Reddy O


Re: Review Request 24214: Patch for KAFKA-1374

Posted by Manikumar Reddy O <ma...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24214/
-----------------------------------------------------------

(Updated Jan. 17, 2015, 6:53 p.m.)


Review request for kafka.


Bugs: KAFKA-1374
    https://issues.apache.org/jira/browse/KAFKA-1374


Repository: kafka


Description
-------

Updating the rebased code


Diffs
-----

  core/src/main/scala/kafka/log/LogCleaner.scala f8e7cd5fabce78c248a9027c4bb374a792508675 
  core/src/main/scala/kafka/tools/TestLogCleaning.scala af496f7c547a5ac7a4096a6af325dad0d8feec6f 
  core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala 07acd460b1259e0a3f4069b8b8dcd8123ef5810e 

Diff: https://reviews.apache.org/r/24214/diff/


Testing (updated)
-------

/*TestLogCleaning stress test output for compressed messages/

Producing 100000 messages...
Logging produce requests to /tmp/kafka-log-cleaner-produced-6014466306002699464.txt
Sleeping for 120 seconds...
Consuming messages...
Logging consumed messages to /tmp/kafka-log-cleaner-consumed-177538909590644701.txt
100000 rows of data produced, 13165 rows of data consumed (86.8% reduction).
De-duplicating and validating output files...
Validated 9005 values, 0 mismatches.

Producing 1000000 messages...
Logging produce requests to /tmp/kafka-log-cleaner-produced-3298578695475992991.txt
Sleeping for 120 seconds...
Consuming messages...
Logging consumed messages to /tmp/kafka-log-cleaner-consumed-7192293977610206930.txt
1000000 rows of data produced, 119926 rows of data consumed (88.0% reduction).
De-duplicating and validating output files...
Validated 89947 values, 0 mismatches.

Producing 10000000 messages...
Logging produce requests to /tmp/kafka-log-cleaner-produced-3336255463347572934.txt
Sleeping for 120 seconds...
Consuming messages...
Logging consumed messages to /tmp/kafka-log-cleaner-consumed-9149188270705707725.txt
10000000 rows of data produced, 1645281 rows of data consumed (83.5% reduction).
De-duplicating and validating output files...
Validated 899853 values, 0 mismatches.


/*TestLogCleaning stress test output for non-compressed messages*/

Producing 100000 messages...
Logging produce requests to /tmp/kafka-log-cleaner-produced-5174543709786189363.txt
Sleeping for 120 seconds...
Consuming messages...
Logging consumed messages to /tmp/kafka-log-cleaner-consumed-5143455017777144701.txt
100000 rows of data produced, 22775 rows of data consumed (77.2% reduction).
De-duplicating and validating output files...
Validated 17874 values, 0 mismatches.

Producing 1000000 messages...
Logging produce requests to /tmp/kafka-log-cleaner-produced-7814446915546169271.txt
Sleeping for 120 seconds...
Consuming messages...
Logging consumed messages to /tmp/kafka-log-cleaner-consumed-5172557663160447626.txt
1000000 rows of data produced, 129230 rows of data consumed (87.1% reduction).
De-duplicating and validating output files...
Validated 89947 values, 0 mismatches.

Producing 10000000 messages...
Logging produce requests to /tmp/kafka-log-cleaner-produced-6092986571905399164.txt
Sleeping for 120 seconds...
Consuming messages...
Logging consumed messages to /tmp/kafka-log-cleaner-consumed-63626021421841220.txt
10000000 rows of data produced, 1136608 rows of data consumed (88.6% reduction).
De-duplicating and validating output files...
Validated 899853 values, 0 mismatches.


Thanks,

Manikumar Reddy O


Re: Review Request 24214: Patch for KAFKA-1374

Posted by Manikumar Reddy O <ma...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24214/
-----------------------------------------------------------

(Updated Jan. 17, 2015, 6:51 p.m.)


Review request for kafka.


Bugs: KAFKA-1374
    https://issues.apache.org/jira/browse/KAFKA-1374


Repository: kafka


Description (updated)
-------

Updating the rebased code


Diffs (updated)
-----

  core/src/main/scala/kafka/log/LogCleaner.scala f8e7cd5fabce78c248a9027c4bb374a792508675 
  core/src/main/scala/kafka/tools/TestLogCleaning.scala af496f7c547a5ac7a4096a6af325dad0d8feec6f 
  core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala 07acd460b1259e0a3f4069b8b8dcd8123ef5810e 

Diff: https://reviews.apache.org/r/24214/diff/


Testing (updated)
-------

/safe/KAFKA/docs/TestLogCleaning.txt


Thanks,

Manikumar Reddy O


Re: Review Request 24214: Patch for KAFKA-1374

Posted by Manikumar Reddy O <ma...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24214/
-----------------------------------------------------------

(Updated Oct. 3, 2014, 1:50 p.m.)


Review request for kafka.


Bugs: KAFKA-1374
    https://issues.apache.org/jira/browse/KAFKA-1374


Repository: kafka


Description
-------

fixed couple of bugs and updating stress test details


Diffs (updated)
-----

  core/src/main/scala/kafka/log/LogCleaner.scala c20de4ad4734c0bd83c5954fdb29464a27b91dff 
  core/src/main/scala/kafka/tools/TestLogCleaning.scala 1d4ea93f2ba8d4d4d47a307cd47f54a15d3d30dd 
  core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala 5bfa764638e92f217d0ff7108ec8f53193c22978 

Diff: https://reviews.apache.org/r/24214/diff/


Testing
-------


Thanks,

Manikumar Reddy O


Re: Review Request 24214: Patch for KAFKA-1374

Posted by Manikumar Reddy O <ma...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24214/
-----------------------------------------------------------

(Updated Oct. 3, 2014, 1:22 p.m.)


Review request for kafka.


Bugs: KAFKA-1374
    https://issues.apache.org/jira/browse/KAFKA-1374


Repository: kafka


Description (updated)
-------

fixed couple of bugs and updating stress test details


Diffs (updated)
-----

  core/src/main/scala/kafka/log/LogCleaner.scala c20de4ad4734c0bd83c5954fdb29464a27b91dff 
  core/src/main/scala/kafka/tools/TestLogCleaning.scala 1d4ea93f2ba8d4d4d47a307cd47f54a15d3d30dd 
  core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala 5bfa764638e92f217d0ff7108ec8f53193c22978 

Diff: https://reviews.apache.org/r/24214/diff/


Testing
-------


Thanks,

Manikumar Reddy O


Re: Review Request 24214: Patch for KAFKA-1374

Posted by Manikumar Reddy O <ma...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24214/
-----------------------------------------------------------

(Updated Sept. 23, 2014, 4:20 p.m.)


Review request for kafka.


Bugs: KAFKA-1374
    https://issues.apache.org/jira/browse/KAFKA-1374


Repository: kafka


Description (updated)
-------

Addresing Jun's comments


Diffs (updated)
-----

  core/src/main/scala/kafka/log/LogCleaner.scala c20de4ad4734c0bd83c5954fdb29464a27b91dff 
  core/src/main/scala/kafka/tools/TestLogCleaning.scala 1d4ea93f2ba8d4d4d47a307cd47f54a15d3d30dd 
  core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala 5bfa764638e92f217d0ff7108ec8f53193c22978 

Diff: https://reviews.apache.org/r/24214/diff/


Testing
-------


Thanks,

Manikumar Reddy O


Re: Review Request 24214: Patch for KAFKA-1374

Posted by Manikumar Reddy O <ma...@gmail.com>.

> On Aug. 18, 2014, 5:32 p.m., Joel Koshy wrote:
> > I should be able to review this later today. However, as Jun also mentioned can you please run the stress test? When I was working on the original (WIP) patch it worked but eventually failed (due to various reasons such as corrupt message sizes, etc) on a stress test after several segments had rolled and after several log cleaner runs - although I didn't get time to look into it your patch should have hopefully addressed these issues.

I tested the patch with my own test code and it is working fine.

I ran TestLogCleaning stress test.  Some times this test is failing. 
But i am not getting any broker-side errors/corrupt messages.  

I also ran TestLogCleaning on trunk (without my patch). This test is failing for multiple topics.
I am looking into TestLogCleaning code and trying fix if any issue.

I will keep you updated on the testing status.


- Manikumar Reddy


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24214/#review50901
-----------------------------------------------------------


On Sept. 23, 2014, 4:20 p.m., Manikumar Reddy O wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/24214/
> -----------------------------------------------------------
> 
> (Updated Sept. 23, 2014, 4:20 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1374
>     https://issues.apache.org/jira/browse/KAFKA-1374
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Addresing Jun's comments
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/log/LogCleaner.scala c20de4ad4734c0bd83c5954fdb29464a27b91dff 
>   core/src/main/scala/kafka/tools/TestLogCleaning.scala 1d4ea93f2ba8d4d4d47a307cd47f54a15d3d30dd 
>   core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala 5bfa764638e92f217d0ff7108ec8f53193c22978 
> 
> Diff: https://reviews.apache.org/r/24214/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Manikumar Reddy O
> 
>


Re: Review Request 24214: Patch for KAFKA-1374

Posted by Manikumar Reddy O <ma...@gmail.com>.

> On Aug. 18, 2014, 5:32 p.m., Joel Koshy wrote:
> > I should be able to review this later today. However, as Jun also mentioned can you please run the stress test? When I was working on the original (WIP) patch it worked but eventually failed (due to various reasons such as corrupt message sizes, etc) on a stress test after several segments had rolled and after several log cleaner runs - although I didn't get time to look into it your patch should have hopefully addressed these issues.
> 
> Manikumar Reddy O wrote:
>     I tested the patch with my own test code and it is working fine.
>     
>     I ran TestLogCleaning stress test.  Some times this test is failing. 
>     But i am not getting any broker-side errors/corrupt messages.  
>     
>     I also ran TestLogCleaning on trunk (without my patch). This test is failing for multiple topics.
>     I am looking into TestLogCleaning code and trying fix if any issue.
>     
>     I will keep you updated on the testing status.

I successfully ran the TestLogCleaning stress test. I ran the test for 1,5,10 million messages

Jun,

I removed the usage of MemoryRecords, Compressor.putRecord classes from this patch. Currently Compressor.close() returns a compressed message with offset as number of messages in that compression. (If i compress 10,11,12,13,14,15 message offsets, then the compressed message will have offset 5).
Because of this behavior, we can not use this for server-side compression.(For server side, If i compress 10,11,12,13,14,15 message offsets, then the compresed message shoud have offset 15)


- Manikumar Reddy


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24214/#review50901
-----------------------------------------------------------


On Oct. 3, 2014, 1:50 p.m., Manikumar Reddy O wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/24214/
> -----------------------------------------------------------
> 
> (Updated Oct. 3, 2014, 1:50 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1374
>     https://issues.apache.org/jira/browse/KAFKA-1374
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> fixed couple of bugs and updating stress test details
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/log/LogCleaner.scala c20de4ad4734c0bd83c5954fdb29464a27b91dff 
>   core/src/main/scala/kafka/tools/TestLogCleaning.scala 1d4ea93f2ba8d4d4d47a307cd47f54a15d3d30dd 
>   core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala 5bfa764638e92f217d0ff7108ec8f53193c22978 
> 
> Diff: https://reviews.apache.org/r/24214/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Manikumar Reddy O
> 
>


Re: Review Request 24214: Patch for KAFKA-1374

Posted by Joel Koshy <jj...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24214/#review50901
-----------------------------------------------------------


I should be able to review this later today. However, as Jun also mentioned can you please run the stress test? When I was working on the original (WIP) patch it worked but eventually failed (due to various reasons such as corrupt message sizes, etc) on a stress test after several segments had rolled and after several log cleaner runs - although I didn't get time to look into it your patch should have hopefully addressed these issues.

- Joel Koshy


On Aug. 12, 2014, 4:57 p.m., Manikumar Reddy O wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/24214/
> -----------------------------------------------------------
> 
> (Updated Aug. 12, 2014, 4:57 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1374
>     https://issues.apache.org/jira/browse/KAFKA-1374
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Ignored messages with null Keys during compaction. This is for KAFKA-1581. It is a simple fix. so combining with this patch.
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/log/LogCleaner.scala c20de4ad4734c0bd83c5954fdb29464a27b91dff 
>   core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala 5bfa764638e92f217d0ff7108ec8f53193c22978 
> 
> Diff: https://reviews.apache.org/r/24214/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Manikumar Reddy O
> 
>


Re: Review Request 24214: Patch for KAFKA-1374

Posted by Manikumar Reddy O <ma...@gmail.com>.

> On Aug. 18, 2014, 5:21 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/log/LogCleaner.scala, lines 436-438
> > <https://reviews.apache.org/r/24214/diff/3-5/?file=657031#file657031line436>
> >
> >     Hmm, I think the original approach of throwing an exception is probably better. When handling the produce requests, we can reject messages w/o a key, if the topic is configured with compaction. Once we do that, there should be no messages with null key during compaction. If that happens, we should just fail the broker.

Ok.. I reverted the changes. We will revisit the solution in KAFKA-1581


> On Aug. 18, 2014, 5:21 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/log/LogCleaner.scala, lines 479-481
> > <https://reviews.apache.org/r/24214/diff/5/?file=658590#file658590line479>
> >
> >     Could we use MemoryRecords.RecordsIterator to iterate compressed messages?

This chage required some complicated, So i am dropping this issue.


- Manikumar Reddy


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24214/#review50893
-----------------------------------------------------------


On Sept. 23, 2014, 4:20 p.m., Manikumar Reddy O wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/24214/
> -----------------------------------------------------------
> 
> (Updated Sept. 23, 2014, 4:20 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1374
>     https://issues.apache.org/jira/browse/KAFKA-1374
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Addresing Jun's comments
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/log/LogCleaner.scala c20de4ad4734c0bd83c5954fdb29464a27b91dff 
>   core/src/main/scala/kafka/tools/TestLogCleaning.scala 1d4ea93f2ba8d4d4d47a307cd47f54a15d3d30dd 
>   core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala 5bfa764638e92f217d0ff7108ec8f53193c22978 
> 
> Diff: https://reviews.apache.org/r/24214/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Manikumar Reddy O
> 
>


Re: Review Request 24214: Patch for KAFKA-1374

Posted by Jun Rao <ju...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24214/#review50893
-----------------------------------------------------------


Thanks for the patch. Looks good overall.

Could you run the stress test in TestLogCleaning with compression turned on and see if there is any problem?


core/src/main/scala/kafka/log/LogCleaner.scala
<https://reviews.apache.org/r/24214/#comment88766>

    Hmm, I think the original approach of throwing an exception is probably better. When handling the produce requests, we can reject messages w/o a key, if the topic is configured with compaction. Once we do that, there should be no messages with null key during compaction. If that happens, we should just fail the broker.



core/src/main/scala/kafka/log/LogCleaner.scala
<https://reviews.apache.org/r/24214/#comment88765>

    Could we use MemoryRecords.RecordsIterator to iterate compressed messages?



core/src/main/scala/kafka/log/LogCleaner.scala
<https://reviews.apache.org/r/24214/#comment88763>

    Could this be named compressMessages()?


- Jun Rao


On Aug. 12, 2014, 4:57 p.m., Manikumar Reddy O wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/24214/
> -----------------------------------------------------------
> 
> (Updated Aug. 12, 2014, 4:57 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1374
>     https://issues.apache.org/jira/browse/KAFKA-1374
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Ignored messages with null Keys during compaction. This is for KAFKA-1581. It is a simple fix. so combining with this patch.
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/log/LogCleaner.scala c20de4ad4734c0bd83c5954fdb29464a27b91dff 
>   core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala 5bfa764638e92f217d0ff7108ec8f53193c22978 
> 
> Diff: https://reviews.apache.org/r/24214/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Manikumar Reddy O
> 
>


Re: Review Request 24214: Patch for KAFKA-1374

Posted by Manikumar Reddy O <ma...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24214/
-----------------------------------------------------------

(Updated Aug. 12, 2014, 4:57 p.m.)


Review request for kafka.


Bugs: KAFKA-1374
    https://issues.apache.org/jira/browse/KAFKA-1374


Repository: kafka


Description (updated)
-------

Ignored messages with null Keys during compaction. This is for KAFKA-1581. It is a simple fix. so combining with this patch.


Diffs
-----

  core/src/main/scala/kafka/log/LogCleaner.scala c20de4ad4734c0bd83c5954fdb29464a27b91dff 
  core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala 5bfa764638e92f217d0ff7108ec8f53193c22978 

Diff: https://reviews.apache.org/r/24214/diff/


Testing
-------


Thanks,

Manikumar Reddy O


Re: Review Request 24214: Patch for KAFKA-1374

Posted by Manikumar Reddy O <ma...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24214/
-----------------------------------------------------------

(Updated Aug. 12, 2014, 4:55 p.m.)


Review request for kafka.


Bugs: KAFKA-1374
    https://issues.apache.org/jira/browse/KAFKA-1374


Repository: kafka


Description (updated)
-------

Ignored messages with null Keys during compaction. This is for KAFKA-1581.


Diffs (updated)
-----

  core/src/main/scala/kafka/log/LogCleaner.scala c20de4ad4734c0bd83c5954fdb29464a27b91dff 
  core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala 5bfa764638e92f217d0ff7108ec8f53193c22978 

Diff: https://reviews.apache.org/r/24214/diff/


Testing
-------


Thanks,

Manikumar Reddy O


Re: Review Request 24214: Patch for KAFKA-1374

Posted by Manikumar Reddy O <ma...@gmail.com>.

> On Aug. 10, 2014, 11:44 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/log/LogCleaner.scala, lines 400-420
> > <https://reviews.apache.org/r/24214/diff/4/?file=657033#file657033line400>
> >
> >     Thinking about this a bit more. I am wondering if it would be better if we introduce a per-topic level log.compact.compress.codec property. During log compaction, we always write the retained data using the specified compress codec, independent of whether the original records are compressed or not. This provides the following benefits.
> >     
> >     1. Whether the messages were compressed originally, they can be compressed on the broker side over time. Since compact topics preserve records much longer, enabling compression on the broker side will be beneficial in general.
> >     
> >     2. As old records are removed, we still want to batch enough messages to do the compression.
> >     
> >     3. The code can be a bit simpler. We can just (deep) iterate messages (using MemoryRecods.iterator) and append retained messages to an output MemoryRecords. The output MemoryRecords will be initialized with the configured compress codec and batch size.

What you proposed is similar to KAFKA-1499. KAFKA-1499 deals with default broker-side compression configuration.
I proposed new configuration properties on KAFKA-1499. The idea is to compress the data upon reaching the server.
This is applicable all topics (log compaction and retention).

Can you comment on KAFKA-1499?


- Manikumar Reddy


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24214/#review50128
-----------------------------------------------------------


On Aug. 9, 2014, 10:51 a.m., Manikumar Reddy O wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/24214/
> -----------------------------------------------------------
> 
> (Updated Aug. 9, 2014, 10:51 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1374
>     https://issues.apache.org/jira/browse/KAFKA-1374
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Addressed Jun's comments;Added few changes in LogCleaner stats for compressed messages
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/log/LogCleaner.scala c20de4ad4734c0bd83c5954fdb29464a27b91dff 
>   core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala 5bfa764638e92f217d0ff7108ec8f53193c22978 
> 
> Diff: https://reviews.apache.org/r/24214/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Manikumar Reddy O
> 
>


Re: Review Request 24214: Patch for KAFKA-1374

Posted by Manikumar Reddy O <ma...@gmail.com>.

> On Aug. 10, 2014, 11:44 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/log/LogCleaner.scala, lines 400-420
> > <https://reviews.apache.org/r/24214/diff/4/?file=657033#file657033line400>
> >
> >     Thinking about this a bit more. I am wondering if it would be better if we introduce a per-topic level log.compact.compress.codec property. During log compaction, we always write the retained data using the specified compress codec, independent of whether the original records are compressed or not. This provides the following benefits.
> >     
> >     1. Whether the messages were compressed originally, they can be compressed on the broker side over time. Since compact topics preserve records much longer, enabling compression on the broker side will be beneficial in general.
> >     
> >     2. As old records are removed, we still want to batch enough messages to do the compression.
> >     
> >     3. The code can be a bit simpler. We can just (deep) iterate messages (using MemoryRecods.iterator) and append retained messages to an output MemoryRecords. The output MemoryRecords will be initialized with the configured compress codec and batch size.
> 
> Manikumar Reddy O wrote:
>     What you proposed is similar to KAFKA-1499. KAFKA-1499 deals with default broker-side compression configuration.
>     I proposed new configuration properties on KAFKA-1499. The idea is to compress the data upon reaching the server.
>     This is applicable all topics (log compaction and retention).
>     
>     Can you comment on KAFKA-1499?

Assuming we have broker-side compression (KAFKA-1499), Do we still need special-compression during log compaction?

1) With some broker-side compression (codec: gzip, snappy. etc..)

With KAFKA-1499 we will compress all the messages with specified compression codec. During log compaction, we write 
the retained data using same compression codec. 

2) Without broker-side compression (codec: none)

If some user is not configuring broker-side compression, then we will write the retained messages using their
original compression type. 

Current patch supports above points.


- Manikumar Reddy


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24214/#review50128
-----------------------------------------------------------


On Aug. 9, 2014, 10:51 a.m., Manikumar Reddy O wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/24214/
> -----------------------------------------------------------
> 
> (Updated Aug. 9, 2014, 10:51 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1374
>     https://issues.apache.org/jira/browse/KAFKA-1374
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Addressed Jun's comments;Added few changes in LogCleaner stats for compressed messages
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/log/LogCleaner.scala c20de4ad4734c0bd83c5954fdb29464a27b91dff 
>   core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala 5bfa764638e92f217d0ff7108ec8f53193c22978 
> 
> Diff: https://reviews.apache.org/r/24214/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Manikumar Reddy O
> 
>


Re: Review Request 24214: Patch for KAFKA-1374

Posted by Jun Rao <ju...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24214/#review50128
-----------------------------------------------------------



core/src/main/scala/kafka/log/LogCleaner.scala
<https://reviews.apache.org/r/24214/#comment87704>

    Thinking about this a bit more. I am wondering if it would be better if we introduce a per-topic level log.compact.compress.codec property. During log compaction, we always write the retained data using the specified compress codec, independent of whether the original records are compressed or not. This provides the following benefits.
    
    1. Whether the messages were compressed originally, they can be compressed on the broker side over time. Since compact topics preserve records much longer, enabling compression on the broker side will be beneficial in general.
    
    2. As old records are removed, we still want to batch enough messages to do the compression.
    
    3. The code can be a bit simpler. We can just (deep) iterate messages (using MemoryRecods.iterator) and append retained messages to an output MemoryRecords. The output MemoryRecords will be initialized with the configured compress codec and batch size.


- Jun Rao


On Aug. 9, 2014, 10:51 a.m., Manikumar Reddy O wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/24214/
> -----------------------------------------------------------
> 
> (Updated Aug. 9, 2014, 10:51 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1374
>     https://issues.apache.org/jira/browse/KAFKA-1374
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Addressed Jun's comments;Added few changes in LogCleaner stats for compressed messages
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/log/LogCleaner.scala c20de4ad4734c0bd83c5954fdb29464a27b91dff 
>   core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala 5bfa764638e92f217d0ff7108ec8f53193c22978 
> 
> Diff: https://reviews.apache.org/r/24214/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Manikumar Reddy O
> 
>


Re: Review Request 24214: Patch for KAFKA-1374

Posted by Manikumar Reddy O <ma...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24214/
-----------------------------------------------------------

(Updated Aug. 9, 2014, 10:51 a.m.)


Review request for kafka.


Bugs: KAFKA-1374
    https://issues.apache.org/jira/browse/KAFKA-1374


Repository: kafka


Description
-------

Addressed Jun's comments;Added few changes in LogCleaner stats for compressed messages


Diffs (updated)
-----

  core/src/main/scala/kafka/log/LogCleaner.scala c20de4ad4734c0bd83c5954fdb29464a27b91dff 
  core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala 5bfa764638e92f217d0ff7108ec8f53193c22978 

Diff: https://reviews.apache.org/r/24214/diff/


Testing
-------


Thanks,

Manikumar Reddy O


Re: Review Request 24214: Patch for KAFKA-1374

Posted by Manikumar Reddy O <ma...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24214/
-----------------------------------------------------------

(Updated Aug. 9, 2014, 10:37 a.m.)


Review request for kafka.


Bugs: KAFKA-1374
    https://issues.apache.org/jira/browse/KAFKA-1374


Repository: kafka


Description
-------

Addressed Jun's comments;Added few changes in LogCleaner stats for compressed messages


Diffs (updated)
-----

  core/src/main/scala/kafka/log/LogCleaner.scala c20de4ad4734c0bd83c5954fdb29464a27b91dff 
  core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala 5bfa764638e92f217d0ff7108ec8f53193c22978 

Diff: https://reviews.apache.org/r/24214/diff/


Testing
-------


Thanks,

Manikumar Reddy O


Re: Review Request 24214: Patch for KAFKA-1374

Posted by Manikumar Reddy O <ma...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24214/
-----------------------------------------------------------

(Updated Aug. 9, 2014, 10:30 a.m.)


Review request for kafka.


Bugs: KAFKA-1374
    https://issues.apache.org/jira/browse/KAFKA-1374


Repository: kafka


Description (updated)
-------

Addressed Jun's comments;Added few changes in LogCleaner stats for compressed messages


Diffs (updated)
-----

  core/src/main/scala/kafka/log/LogCleaner.scala c20de4ad4734c0bd83c5954fdb29464a27b91dff 
  core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala 5bfa764638e92f217d0ff7108ec8f53193c22978 

Diff: https://reviews.apache.org/r/24214/diff/


Testing
-------


Thanks,

Manikumar Reddy O


Re: Review Request 24214: Patch for KAFKA-1374

Posted by Jun Rao <ju...@gmail.com>.

> On Aug. 6, 2014, 4:29 a.m., Jun Rao wrote:
> > core/src/main/scala/kafka/log/LogCleaner.scala, lines 500-506
> > <https://reviews.apache.org/r/24214/diff/1/?file=649265#file649265line500>
> >
> >     Could we use Compressor.putRecord? Then,we don't have to worry about the details of the message format.
> 
> Manikumar Reddy O wrote:
>     Ok.. I will look into Compressor class usage.  But Compressor/MemoryRecords/Record classes are part of clients project. Can we use these classes in core?  I have not seen these classes usage in core project.

Yes, core already depends on clients. The idea is that overtime, the server side code will be sharing some of the common components in the client.


- Jun


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24214/#review49611
-----------------------------------------------------------


On Aug. 3, 2014, 5:40 p.m., Manikumar Reddy O wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/24214/
> -----------------------------------------------------------
> 
> (Updated Aug. 3, 2014, 5:40 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1374
>     https://issues.apache.org/jira/browse/KAFKA-1374
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> LogCleaner code decompresses the compressed messages and writes back the retained/compacted messages in compressed form
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/log/LogCleaner.scala afbeffc72e7d7706b44961aecf8519c5c5a3b4b1 
>   core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala 5bfa764638e92f217d0ff7108ec8f53193c22978 
> 
> Diff: https://reviews.apache.org/r/24214/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Manikumar Reddy O
> 
>


Re: Review Request 24214: Patch for KAFKA-1374

Posted by Manikumar Reddy O <ma...@gmail.com>.

> On Aug. 6, 2014, 4:29 a.m., Jun Rao wrote:
> > core/src/main/scala/kafka/log/LogCleaner.scala, lines 500-506
> > <https://reviews.apache.org/r/24214/diff/1/?file=649265#file649265line500>
> >
> >     Could we use Compressor.putRecord? Then,we don't have to worry about the details of the message format.

Ok.. I will look into Compressor class usage.  But Compressor/MemoryRecords/Record classes are part of clients project. Can we use these classes in core?  I have not seen these classes usage in core project.


> On Aug. 6, 2014, 4:29 a.m., Jun Rao wrote:
> > core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala, lines 43-51
> > <https://reviews.apache.org/r/24214/diff/1/?file=649266#file649266line43>
> >
> >     Would it be better to make this a parameterized test so that we can test all compression codec?

Ok.. Will add  parameterized tests


- Manikumar Reddy


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24214/#review49611
-----------------------------------------------------------


On Aug. 3, 2014, 5:40 p.m., Manikumar Reddy O wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/24214/
> -----------------------------------------------------------
> 
> (Updated Aug. 3, 2014, 5:40 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1374
>     https://issues.apache.org/jira/browse/KAFKA-1374
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> LogCleaner code decompresses the compressed messages and writes back the retained/compacted messages in compressed form
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/log/LogCleaner.scala afbeffc72e7d7706b44961aecf8519c5c5a3b4b1 
>   core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala 5bfa764638e92f217d0ff7108ec8f53193c22978 
> 
> Diff: https://reviews.apache.org/r/24214/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Manikumar Reddy O
> 
>


Re: Review Request 24214: Patch for KAFKA-1374

Posted by Jun Rao <ju...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24214/#review49611
-----------------------------------------------------------


Thanks for the patch. Some comments below.


core/src/main/scala/kafka/log/LogCleaner.scala
<https://reviews.apache.org/r/24214/#comment86836>

    We probably should just record the compressed size in stats.recopyMessage() since that's the size actually got copied over.



core/src/main/scala/kafka/log/LogCleaner.scala
<https://reviews.apache.org/r/24214/#comment86842>

    Could we use Compressor.putRecord? Then,we don't have to worry about the details of the message format.



core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
<https://reviews.apache.org/r/24214/#comment86841>

    Would it be better to make this a parameterized test so that we can test all compression codec?


- Jun Rao


On Aug. 3, 2014, 5:40 p.m., Manikumar Reddy O wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/24214/
> -----------------------------------------------------------
> 
> (Updated Aug. 3, 2014, 5:40 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1374
>     https://issues.apache.org/jira/browse/KAFKA-1374
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> LogCleaner code decompresses the compressed messages and writes back the retained/compacted messages in compressed form
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/log/LogCleaner.scala afbeffc72e7d7706b44961aecf8519c5c5a3b4b1 
>   core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala 5bfa764638e92f217d0ff7108ec8f53193c22978 
> 
> Diff: https://reviews.apache.org/r/24214/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Manikumar Reddy O
> 
>