You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Jun Rao <ju...@gmail.com> on 2015/09/01 03:02:58 UTC

Re: Review Request 36858: Patch for KAFKA-2120

-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36858/#review97213
-----------------------------------------------------------



clients/src/main/java/org/apache/kafka/clients/NetworkClient.java (lines 361 - 366)
<https://reviews.apache.org/r/36858/#comment153008>

    We need to think through this logic a bit more. The patch here disconnects the connection from the selector, but doesn't mark the connectionState as disconnected immediately. Only until the next networkClient.poll(), does the connectionState change to disconnected. The issue with this approach is that selector.disconnect actually cancels the socket key. So, at that moment, the connection is no longer usable. However, the connectionState is still connected. A client can check the connection as ready and then makes a send call. The send will then get a CancelledKeyException, which is weird.
    
    We are dealing with a similar issue in KAFKA-2411. Our current thinking is to have a networkClient.disconnect() that closes the socket key as well as removes the client from connectionState. This will make the state in networkClient consistent after each poll() call. If we have that, we can just call networkClient.disconnect() in handleTimedOutRequests() and handle those disconnected connections immediately. Then, we don't need to maintain the clientDisconnects state in Selector.


- Jun Rao


On Aug. 12, 2015, 5:59 p.m., Mayuresh Gharat wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/36858/
> -----------------------------------------------------------
> 
> (Updated Aug. 12, 2015, 5:59 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-2120
>     https://issues.apache.org/jira/browse/KAFKA-2120
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Solved compile error
> 
> 
> Addressed Jason's comments for Kip-19
> 
> 
> Addressed Jun's comments
> 
> 
> Addressed Jason's comments about the default values for requestTimeout
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/clients/ClientRequest.java dc8f0f115bcda893c95d17c0a57be8d14518d034 
>   clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java 2c421f42ed3fc5d61cf9c87a7eaa7bb23e26f63b 
>   clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java 15d00d4e484bb5d51a9ae6857ed6e024a2cc1820 
>   clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 7ab2503794ff3aab39df881bd9fbae6547827d3b 
>   clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 0e51d7bd461d253f4396a5b6ca7cd391658807fa 
>   clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java d35b421a515074d964c7fccb73d260b847ea5f00 
>   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java ed99e9bdf7c4ea7a6d4555d4488cf8ed0b80641b 
>   clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java 9517d9d0cd480d5ba1d12f1fde7963e60528d2f8 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 03b8dd23df63a8d8a117f02eabcce4a2d48c44f7 
>   clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java aa264202f2724907924985a5ecbe74afc4c6c04b 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java 4cb1e50d6c4ed55241aeaef1d3af09def5274103 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java a152bd7697dca55609a9ec4cfe0a82c10595fbc3 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java 06182db1c3a5da85648199b4c0c98b80ea7c6c0c 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 0baf16e55046a2f49f6431e01d52c323c95eddf0 
>   clients/src/main/java/org/apache/kafka/common/network/Selector.java ce20111ac434eb8c74585e9c63757bb9d60a832f 
>   clients/src/test/java/org/apache/kafka/clients/MockClient.java 9133d85342b11ba2c9888d4d2804d181831e7a8e 
>   clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java 43238ceaad0322e39802b615bb805b895336a009 
>   clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java 2c693824fa53db1e38766b8c66a0ef42ef9d0f3a 
>   clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java 5b2e4ffaeab7127648db608c179703b27b577414 
>   clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java 8b1805d3d2bcb9fe2bacb37d870c3236aa9532c4 
>   clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java 158f9829ff64a969008f699e40c51e918287859e 
>   core/src/main/scala/kafka/tools/ProducerPerformance.scala 0335cc64013ffe2cdf1c4879e86e11ec8c526712 
>   core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala ee94011894b46864614b97bbd2a98375a7d3f20b 
>   core/src/test/scala/unit/kafka/utils/TestUtils.scala eb169d8b33c27d598cc24e5a2e5f78b789fa38d3 
> 
> Diff: https://reviews.apache.org/r/36858/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Mayuresh Gharat
> 
>


Re: Review Request 36858: Patch for KAFKA-2120

Posted by Mayuresh Gharat <gh...@gmail.com>.

> On Sept. 1, 2015, 1:02 a.m., Jun Rao wrote:
> > clients/src/main/java/org/apache/kafka/clients/NetworkClient.java, lines 362-367
> > <https://reviews.apache.org/r/36858/diff/5/?file=1038488#file1038488line362>
> >
> >     We need to think through this logic a bit more. The patch here disconnects the connection from the selector, but doesn't mark the connectionState as disconnected immediately. Only until the next networkClient.poll(), does the connectionState change to disconnected. The issue with this approach is that selector.disconnect actually cancels the socket key. So, at that moment, the connection is no longer usable. However, the connectionState is still connected. A client can check the connection as ready and then makes a send call. The send will then get a CancelledKeyException, which is weird.
> >     
> >     We are dealing with a similar issue in KAFKA-2411. Our current thinking is to have a networkClient.disconnect() that closes the socket key as well as removes the client from connectionState. This will make the state in networkClient consistent after each poll() call. If we have that, we can just call networkClient.disconnect() in handleTimedOutRequests() and handle those disconnected connections immediately. Then, we don't need to maintain the clientDisconnects state in Selector.
> 
> Mayuresh Gharat wrote:
>     Thanks a lot Jun.
>     I was thinking about something similar when I was rebasing the patch yesterday with latest trunk. So the initial code that I wrote, was taking the disconnected nodes and adding them to disconnected list in disconnect() in Selector. But that imposes dpendency that handleTimeout() should be called before handledisconnections(), because every poll clears the disconnected list. I will test the approach you suggested and upload a patch for this.

Thinking more on this I feel that we can add the explicitly disconnected node in disconnect() function of Selector to the List<disconnected> in selector and call the handleTimeOut() before handleDisconnections() in NetworkClient. That will handle all the above requirements in the same poll(). The only issue with this is that as per the javadoc for disconnect() ("*The disconnection is asynchronous and will not be processed until the next {@link #poll(long) poll()} call.*")
The reason we need to have the explicitly disconnected node in to the selectors disconnected list is we need to have all the functionality in handleDisconnections() (metadataUpdate, clearing the inflight request for that node) and don't want to duplicate code.

The other way is maintain a list of disconnected node from handleTimeOut. In handleDisconnections iterate over the merged list of disconnected nodes from this list and the selectors disconnected list. This takes into consideration the approach that you mentioned above. What do you think?


- Mayuresh


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36858/#review97213
-----------------------------------------------------------


On Aug. 12, 2015, 5:59 p.m., Mayuresh Gharat wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/36858/
> -----------------------------------------------------------
> 
> (Updated Aug. 12, 2015, 5:59 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-2120
>     https://issues.apache.org/jira/browse/KAFKA-2120
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Solved compile error
> 
> 
> Addressed Jason's comments for Kip-19
> 
> 
> Addressed Jun's comments
> 
> 
> Addressed Jason's comments about the default values for requestTimeout
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/clients/ClientRequest.java dc8f0f115bcda893c95d17c0a57be8d14518d034 
>   clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java 2c421f42ed3fc5d61cf9c87a7eaa7bb23e26f63b 
>   clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java 15d00d4e484bb5d51a9ae6857ed6e024a2cc1820 
>   clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 7ab2503794ff3aab39df881bd9fbae6547827d3b 
>   clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 0e51d7bd461d253f4396a5b6ca7cd391658807fa 
>   clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java d35b421a515074d964c7fccb73d260b847ea5f00 
>   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java ed99e9bdf7c4ea7a6d4555d4488cf8ed0b80641b 
>   clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java 9517d9d0cd480d5ba1d12f1fde7963e60528d2f8 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 03b8dd23df63a8d8a117f02eabcce4a2d48c44f7 
>   clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java aa264202f2724907924985a5ecbe74afc4c6c04b 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java 4cb1e50d6c4ed55241aeaef1d3af09def5274103 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java a152bd7697dca55609a9ec4cfe0a82c10595fbc3 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java 06182db1c3a5da85648199b4c0c98b80ea7c6c0c 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 0baf16e55046a2f49f6431e01d52c323c95eddf0 
>   clients/src/main/java/org/apache/kafka/common/network/Selector.java ce20111ac434eb8c74585e9c63757bb9d60a832f 
>   clients/src/test/java/org/apache/kafka/clients/MockClient.java 9133d85342b11ba2c9888d4d2804d181831e7a8e 
>   clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java 43238ceaad0322e39802b615bb805b895336a009 
>   clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java 2c693824fa53db1e38766b8c66a0ef42ef9d0f3a 
>   clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java 5b2e4ffaeab7127648db608c179703b27b577414 
>   clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java 8b1805d3d2bcb9fe2bacb37d870c3236aa9532c4 
>   clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java 158f9829ff64a969008f699e40c51e918287859e 
>   core/src/main/scala/kafka/tools/ProducerPerformance.scala 0335cc64013ffe2cdf1c4879e86e11ec8c526712 
>   core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala ee94011894b46864614b97bbd2a98375a7d3f20b 
>   core/src/test/scala/unit/kafka/utils/TestUtils.scala eb169d8b33c27d598cc24e5a2e5f78b789fa38d3 
> 
> Diff: https://reviews.apache.org/r/36858/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Mayuresh Gharat
> 
>


Re: Review Request 36858: Patch for KAFKA-2120

Posted by Mayuresh Gharat <gh...@gmail.com>.

> On Sept. 1, 2015, 1:02 a.m., Jun Rao wrote:
> > clients/src/main/java/org/apache/kafka/clients/NetworkClient.java, lines 362-367
> > <https://reviews.apache.org/r/36858/diff/5/?file=1038488#file1038488line362>
> >
> >     We need to think through this logic a bit more. The patch here disconnects the connection from the selector, but doesn't mark the connectionState as disconnected immediately. Only until the next networkClient.poll(), does the connectionState change to disconnected. The issue with this approach is that selector.disconnect actually cancels the socket key. So, at that moment, the connection is no longer usable. However, the connectionState is still connected. A client can check the connection as ready and then makes a send call. The send will then get a CancelledKeyException, which is weird.
> >     
> >     We are dealing with a similar issue in KAFKA-2411. Our current thinking is to have a networkClient.disconnect() that closes the socket key as well as removes the client from connectionState. This will make the state in networkClient consistent after each poll() call. If we have that, we can just call networkClient.disconnect() in handleTimedOutRequests() and handle those disconnected connections immediately. Then, we don't need to maintain the clientDisconnects state in Selector.

Thanks a lot Jun.
I was thinking about something similar when I was rebasing the patch yesterday with latest trunk. So the initial code that I wrote, was taking the disconnected nodes and adding them to disconnected list in disconnect() in Selector. But that imposes dpendency that handleTimeout() should be called before handledisconnections(), because every poll clears the disconnected list. I will test the approach you suggested and upload a patch for this.


- Mayuresh


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36858/#review97213
-----------------------------------------------------------


On Aug. 12, 2015, 5:59 p.m., Mayuresh Gharat wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/36858/
> -----------------------------------------------------------
> 
> (Updated Aug. 12, 2015, 5:59 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-2120
>     https://issues.apache.org/jira/browse/KAFKA-2120
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Solved compile error
> 
> 
> Addressed Jason's comments for Kip-19
> 
> 
> Addressed Jun's comments
> 
> 
> Addressed Jason's comments about the default values for requestTimeout
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/clients/ClientRequest.java dc8f0f115bcda893c95d17c0a57be8d14518d034 
>   clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java 2c421f42ed3fc5d61cf9c87a7eaa7bb23e26f63b 
>   clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java 15d00d4e484bb5d51a9ae6857ed6e024a2cc1820 
>   clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 7ab2503794ff3aab39df881bd9fbae6547827d3b 
>   clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 0e51d7bd461d253f4396a5b6ca7cd391658807fa 
>   clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java d35b421a515074d964c7fccb73d260b847ea5f00 
>   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java ed99e9bdf7c4ea7a6d4555d4488cf8ed0b80641b 
>   clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java 9517d9d0cd480d5ba1d12f1fde7963e60528d2f8 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 03b8dd23df63a8d8a117f02eabcce4a2d48c44f7 
>   clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java aa264202f2724907924985a5ecbe74afc4c6c04b 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java 4cb1e50d6c4ed55241aeaef1d3af09def5274103 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java a152bd7697dca55609a9ec4cfe0a82c10595fbc3 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java 06182db1c3a5da85648199b4c0c98b80ea7c6c0c 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 0baf16e55046a2f49f6431e01d52c323c95eddf0 
>   clients/src/main/java/org/apache/kafka/common/network/Selector.java ce20111ac434eb8c74585e9c63757bb9d60a832f 
>   clients/src/test/java/org/apache/kafka/clients/MockClient.java 9133d85342b11ba2c9888d4d2804d181831e7a8e 
>   clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java 43238ceaad0322e39802b615bb805b895336a009 
>   clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java 2c693824fa53db1e38766b8c66a0ef42ef9d0f3a 
>   clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java 5b2e4ffaeab7127648db608c179703b27b577414 
>   clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java 8b1805d3d2bcb9fe2bacb37d870c3236aa9532c4 
>   clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java 158f9829ff64a969008f699e40c51e918287859e 
>   core/src/main/scala/kafka/tools/ProducerPerformance.scala 0335cc64013ffe2cdf1c4879e86e11ec8c526712 
>   core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala ee94011894b46864614b97bbd2a98375a7d3f20b 
>   core/src/test/scala/unit/kafka/utils/TestUtils.scala eb169d8b33c27d598cc24e5a2e5f78b789fa38d3 
> 
> Diff: https://reviews.apache.org/r/36858/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Mayuresh Gharat
> 
>