You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Parth Brahmbhatt <br...@gmail.com> on 2014/12/23 21:42:14 UTC

Review Request 29379: Patch for KAFKA-1788

-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/29379/
-----------------------------------------------------------

Review request for kafka.


Bugs: KAFKA-1788
    https://issues.apache.org/jira/browse/KAFKA-1788


Repository: kafka


Description
-------

KAFKA-1788: Adding configuration for batch.expiration.ms to ensure batches for partitions that has no leader do not stay in accumulator forever.


Diffs
-----

  clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java f61efb35db7e0de590556e6a94a7b5cb850cdae9 
  clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java a893d88c2f4e21509b6c70d6817b4b2cdd0fd657 
  clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java c15485d1af304ef53691d478f113f332fe67af77 
  clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java dd0af8aee98abed5d4a0dc50989e37888bb353fe 
  clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java 2c9932401d573549c40f16fda8c4e3e11309cb85 
  clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java ef2ca65cabe97b909f17b62027a1bb06827e88fe 

Diff: https://reviews.apache.org/r/29379/diff/


Testing
-------


Thanks,

Parth Brahmbhatt


Re: Review Request 29379: Patch for KAFKA-1788

Posted by Ewen Cheslack-Postava <me...@ewencp.org>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/29379/#review66709
-----------------------------------------------------------


I think the basic approach in this patch looks sound and should work fine independent of any fixes to how metadata/leader info is retrieved (as discussed in the JIRA). However, it still needs some cleanup and fixes.


clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
<https://reviews.apache.org/r/29379/#comment110290>

    deque is not threadsafe, you'll need a synchronized block. Since both branches of this if now require that and call deque.peekFirst, you might just want to pull that code out into the surrounding block.



clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
<https://reviews.apache.org/r/29379/#comment110292>

    Looks like this is just a leftover setting from development? This should be using this.batchExpirationMs



clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
<https://reviews.apache.org/r/29379/#comment110294>

    Normally this sequence of batch.done() and deallocate() is called from Sender.completeBatch(), which also calls Sender.sensors.recordErrors() when there was an error, as there was in this case. Any way to rework this so the error can be properly recorded in metrics?



clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java
<https://reviews.apache.org/r/29379/#comment110295>

    This would probably be clearer if it was just batchExpirationMs.


- Ewen Cheslack-Postava


On Dec. 23, 2014, 8:44 p.m., Parth Brahmbhatt wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/29379/
> -----------------------------------------------------------
> 
> (Updated Dec. 23, 2014, 8:44 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1788
>     https://issues.apache.org/jira/browse/KAFKA-1788
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> KAFKA-1788: Adding configuration for batch.expiration.ms to ensure batches for partitions that has no leader do not stay in accumulator forever.
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java f61efb35db7e0de590556e6a94a7b5cb850cdae9 
>   clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java a893d88c2f4e21509b6c70d6817b4b2cdd0fd657 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java c15485d1af304ef53691d478f113f332fe67af77 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java dd0af8aee98abed5d4a0dc50989e37888bb353fe 
>   clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java 2c9932401d573549c40f16fda8c4e3e11309cb85 
>   clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java ef2ca65cabe97b909f17b62027a1bb06827e88fe 
> 
> Diff: https://reviews.apache.org/r/29379/diff/
> 
> 
> Testing
> -------
> 
> Unit test added. 
> 
> 
> Thanks,
> 
> Parth Brahmbhatt
> 
>


Re: Review Request 29379: Patch for KAFKA-1788

Posted by Ewen Cheslack-Postava <me...@ewencp.org>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/29379/#review66942
-----------------------------------------------------------


Thinking about this more, I think this patch only covers one very specific case of the bug. Since we only check for timeouts when the leader lookup returns null, this only handles the case where we're unable to update metadata (which is the only reason we would consistently be missing leader info). It doesn't handle other important cases, e.g. we know the leader but can't send for a long time because we got disconnected and can't reconnect or we're still connected but requests are getting through too slowly so the batch sits in the queue for a long time.

I think it might make sense to restructure this a bit to address those cases and handle the issue with the missing stats/completeBatch call. How about always checking expiration as we're iterating through all these items, including it in the ReadyCheckResult return value as a collection like we do with readyNodes, and then the caller, Sender, can use completeBatch() to clean up? This avoids some duplicate code, gets the error stats right, and makes sure batches are always considered for expiration so it should completely solve the problem. You might need to add a while() loop to pull off all the expired batches and then use the existing code to process the next remaining batch (if there is one left).

Another issue: the current patch relies on ready() being called frequently for the batches to be removed promptly after they expire. However, there are conditions where poll() will be called with large timeouts, potentially up to 5 minutes using the default settings. If we followed the approach described above, we'd probably need to track another value similar to nextReadyCheckDelayMs which would indicate when we would next need to wake up to expire the batch with the earliest expiration time.

The fixes you made did address the problems. It looks like this no longer applies to trunk due to 50b734690, so it'll need rebasing.

- Ewen Cheslack-Postava


On Jan. 6, 2015, 6:44 p.m., Parth Brahmbhatt wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/29379/
> -----------------------------------------------------------
> 
> (Updated Jan. 6, 2015, 6:44 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1788
>     https://issues.apache.org/jira/browse/KAFKA-1788
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Merge remote-tracking branch 'origin/trunk' into KAFKA-1788
> 
> 
> KAFKA-1788: addressed Ewen's comments.
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java f61efb35db7e0de590556e6a94a7b5cb850cdae9 
>   clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java a893d88c2f4e21509b6c70d6817b4b2cdd0fd657 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java c15485d1af304ef53691d478f113f332fe67af77 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java dd0af8aee98abed5d4a0dc50989e37888bb353fe 
>   clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java 2c9932401d573549c40f16fda8c4e3e11309cb85 
>   clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java ef2ca65cabe97b909f17b62027a1bb06827e88fe 
> 
> Diff: https://reviews.apache.org/r/29379/diff/
> 
> 
> Testing
> -------
> 
> Unit test added. 
> 
> 
> Thanks,
> 
> Parth Brahmbhatt
> 
>


Re: Review Request 29379: Patch for KAFKA-1788

Posted by Ewen Cheslack-Postava <me...@ewencp.org>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/29379/#review73512
-----------------------------------------------------------


Minor comments, I think the biggest issue remaining is getting Sender.completeBatch called since that's the only way errors and retries will be properly handled. Left a suggestion about a possible approach to do that while still maintaining the current layering of Sender and RecordAccumulator.


clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
<https://reviews.apache.org/r/29379/#comment119869>

    An hour seems awfully long, what's the reasoning behind this default?



clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
<https://reviews.apache.org/r/29379/#comment119870>

    Add final


- Ewen Cheslack-Postava


On Jan. 6, 2015, 6:44 p.m., Parth Brahmbhatt wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/29379/
> -----------------------------------------------------------
> 
> (Updated Jan. 6, 2015, 6:44 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1788
>     https://issues.apache.org/jira/browse/KAFKA-1788
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Merge remote-tracking branch 'origin/trunk' into KAFKA-1788
> 
> 
> KAFKA-1788: addressed Ewen's comments.
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java f61efb35db7e0de590556e6a94a7b5cb850cdae9 
>   clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java a893d88c2f4e21509b6c70d6817b4b2cdd0fd657 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java c15485d1af304ef53691d478f113f332fe67af77 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java dd0af8aee98abed5d4a0dc50989e37888bb353fe 
>   clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java 2c9932401d573549c40f16fda8c4e3e11309cb85 
>   clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java ef2ca65cabe97b909f17b62027a1bb06827e88fe 
> 
> Diff: https://reviews.apache.org/r/29379/diff/
> 
> 
> Testing
> -------
> 
> Unit test added. 
> 
> 
> Thanks,
> 
> Parth Brahmbhatt
> 
>


Re: Review Request 29379: Patch for KAFKA-1788

Posted by Parth Brahmbhatt <br...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/29379/
-----------------------------------------------------------

(Updated Jan. 6, 2015, 6:44 p.m.)


Review request for kafka.


Bugs: KAFKA-1788
    https://issues.apache.org/jira/browse/KAFKA-1788


Repository: kafka


Description
-------

Merge remote-tracking branch 'origin/trunk' into KAFKA-1788


KAFKA-1788: addressed Ewen's comments.


Diffs (updated)
-----

  clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java f61efb35db7e0de590556e6a94a7b5cb850cdae9 
  clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java a893d88c2f4e21509b6c70d6817b4b2cdd0fd657 
  clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java c15485d1af304ef53691d478f113f332fe67af77 
  clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java dd0af8aee98abed5d4a0dc50989e37888bb353fe 
  clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java 2c9932401d573549c40f16fda8c4e3e11309cb85 
  clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java ef2ca65cabe97b909f17b62027a1bb06827e88fe 

Diff: https://reviews.apache.org/r/29379/diff/


Testing
-------

Unit test added. 


Thanks,

Parth Brahmbhatt


Re: Review Request 29379: Patch for KAFKA-1788

Posted by Ewen Cheslack-Postava <me...@ewencp.org>.

> On Jan. 6, 2015, 6:43 p.m., Parth Brahmbhatt wrote:
> > clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java, line 225
> > <https://reviews.apache.org/r/29379/diff/1/?file=799704#file799704line225>
> >
> >     sender.completeBatch() is only called as part of produce response handling or disconnect. Both of which will never be invoked when there is no broker. I could add sender as a member of record accumulator or pass it as the callback arg as part of the ready() method. All of which is too hecky.
> >     
> >     Let me know if you see some other alternative.

Agree that those options are hacky. Maybe return the information in ReadyCheckResult? RecordAccumulator.ready() is only called from Sender.run(), which could then handle calling completeBatch() on any expired batches. This also has the benefit of integrating with the existing retry logic, although I'm not sure if we want to treat this as a retriable error or not.


- Ewen


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/29379/#review66879
-----------------------------------------------------------


On Jan. 6, 2015, 6:44 p.m., Parth Brahmbhatt wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/29379/
> -----------------------------------------------------------
> 
> (Updated Jan. 6, 2015, 6:44 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1788
>     https://issues.apache.org/jira/browse/KAFKA-1788
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Merge remote-tracking branch 'origin/trunk' into KAFKA-1788
> 
> 
> KAFKA-1788: addressed Ewen's comments.
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java f61efb35db7e0de590556e6a94a7b5cb850cdae9 
>   clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java a893d88c2f4e21509b6c70d6817b4b2cdd0fd657 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java c15485d1af304ef53691d478f113f332fe67af77 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java dd0af8aee98abed5d4a0dc50989e37888bb353fe 
>   clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java 2c9932401d573549c40f16fda8c4e3e11309cb85 
>   clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java ef2ca65cabe97b909f17b62027a1bb06827e88fe 
> 
> Diff: https://reviews.apache.org/r/29379/diff/
> 
> 
> Testing
> -------
> 
> Unit test added. 
> 
> 
> Thanks,
> 
> Parth Brahmbhatt
> 
>


Re: Review Request 29379: Patch for KAFKA-1788

Posted by Parth Brahmbhatt <br...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/29379/#review66879
-----------------------------------------------------------



clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
<https://reviews.apache.org/r/29379/#comment110566>

    pulled up the synchronized dequeue block.



clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
<https://reviews.apache.org/r/29379/#comment110567>

    yes.replaced with batchExpirationMillis.



clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
<https://reviews.apache.org/r/29379/#comment110569>

    sender.completeBatch() is only called as part of produce response handling or disconnect. Both of which will never be invoked when there is no broker. I could add sender as a member of record accumulator or pass it as the callback arg as part of the ready() method. All of which is too hecky.
    
    Let me know if you see some other alternative.



clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java
<https://reviews.apache.org/r/29379/#comment110570>

    done.


- Parth Brahmbhatt


On Jan. 6, 2015, 6:42 p.m., Parth Brahmbhatt wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/29379/
> -----------------------------------------------------------
> 
> (Updated Jan. 6, 2015, 6:42 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1788
>     https://issues.apache.org/jira/browse/KAFKA-1788
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Merge remote-tracking branch 'origin/trunk' into KAFKA-1788
> 
> 
> KAFKA-1788: addressed Ewen's comments.
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java f61efb35db7e0de590556e6a94a7b5cb850cdae9 
>   clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java a893d88c2f4e21509b6c70d6817b4b2cdd0fd657 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java c15485d1af304ef53691d478f113f332fe67af77 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java dd0af8aee98abed5d4a0dc50989e37888bb353fe 
>   clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java 2c9932401d573549c40f16fda8c4e3e11309cb85 
>   clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java ef2ca65cabe97b909f17b62027a1bb06827e88fe 
> 
> Diff: https://reviews.apache.org/r/29379/diff/
> 
> 
> Testing
> -------
> 
> Unit test added. 
> 
> 
> Thanks,
> 
> Parth Brahmbhatt
> 
>


Re: Review Request 29379: Patch for KAFKA-1788

Posted by Parth Brahmbhatt <br...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/29379/
-----------------------------------------------------------

(Updated Jan. 6, 2015, 6:42 p.m.)


Review request for kafka.


Bugs: KAFKA-1788
    https://issues.apache.org/jira/browse/KAFKA-1788


Repository: kafka


Description (updated)
-------

Merge remote-tracking branch 'origin/trunk' into KAFKA-1788


KAFKA-1788: addressed Ewen's comments.


Diffs (updated)
-----

  clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java f61efb35db7e0de590556e6a94a7b5cb850cdae9 
  clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java a893d88c2f4e21509b6c70d6817b4b2cdd0fd657 
  clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java c15485d1af304ef53691d478f113f332fe67af77 
  clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java dd0af8aee98abed5d4a0dc50989e37888bb353fe 
  clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java 2c9932401d573549c40f16fda8c4e3e11309cb85 
  clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java ef2ca65cabe97b909f17b62027a1bb06827e88fe 

Diff: https://reviews.apache.org/r/29379/diff/


Testing
-------

Unit test added. 


Thanks,

Parth Brahmbhatt


Re: Review Request 29379: Patch for KAFKA-1788

Posted by Parth Brahmbhatt <br...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/29379/
-----------------------------------------------------------

(Updated Dec. 23, 2014, 8:44 p.m.)


Review request for kafka.


Changes
-------

KAFKA-1788: Adding configuration for batch.expiration.ms to ensure batches for partitions that has no leader do not stay in accumulator forever.


Bugs: KAFKA-1788
    https://issues.apache.org/jira/browse/KAFKA-1788


Repository: kafka


Description
-------

KAFKA-1788: Adding configuration for batch.expiration.ms to ensure batches for partitions that has no leader do not stay in accumulator forever.


Diffs
-----

  clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java f61efb35db7e0de590556e6a94a7b5cb850cdae9 
  clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java a893d88c2f4e21509b6c70d6817b4b2cdd0fd657 
  clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java c15485d1af304ef53691d478f113f332fe67af77 
  clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java dd0af8aee98abed5d4a0dc50989e37888bb353fe 
  clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java 2c9932401d573549c40f16fda8c4e3e11309cb85 
  clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java ef2ca65cabe97b909f17b62027a1bb06827e88fe 

Diff: https://reviews.apache.org/r/29379/diff/


Testing (updated)
-------

Unit test added. 


Thanks,

Parth Brahmbhatt