You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Dong Lin <li...@gmail.com> on 2015/06/03 01:54:01 UTC
Review Request 34965: Patch for KAFKA-2241
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34965/
-----------------------------------------------------------
Review request for kafka.
Bugs: KAFKA-2241
https://issues.apache.org/jira/browse/KAFKA-2241
Repository: kafka
Description
-------
KAFKA-2241; AbstractFetcherThread.shutdown() should not block on ReadableByteChannel.read(buffer)
Diffs
-----
core/src/main/scala/kafka/consumer/SimpleConsumer.scala 31a2639477bf66f9a05d2b9b07794572d7ec393b
core/src/main/scala/kafka/server/AbstractFetcherThread.scala 83fc47417dd7fe4edf030217fa7fd69d99b170b0
Diff: https://reviews.apache.org/r/34965/diff/
Testing
-------
Thanks,
Dong Lin
Re: Review Request 34965: Patch for KAFKA-2241
Posted by Dong Lin <li...@gmail.com>.
> On June 3, 2015, 6:53 p.m., Aditya Auradkar wrote:
> > core/src/main/scala/kafka/consumer/SimpleConsumer.scala, line 61
> > <https://reviews.apache.org/r/34965/diff/1/?file=976904#file976904line61>
> >
> > Why not simply modify the close method to disconnect outside the synchronized block? Not that I feel very strongly, I'm curious.
Sure. I will think about this after finishing other works.
> On June 3, 2015, 6:53 p.m., Aditya Auradkar wrote:
> > core/src/main/scala/kafka/consumer/SimpleConsumer.scala, line 63
> > <https://reviews.apache.org/r/34965/diff/1/?file=976904#file976904line63>
> >
> > I think this needs to be volatile or AtomicBoolean
You are right. Fixed.
Thanks.
- Dong
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34965/#review86460
-----------------------------------------------------------
On June 3, 2015, 10:30 p.m., Dong Lin wrote:
>
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/34965/
> -----------------------------------------------------------
>
> (Updated June 3, 2015, 10:30 p.m.)
>
>
> Review request for kafka.
>
>
> Bugs: KAFKA-2241
> https://issues.apache.org/jira/browse/KAFKA-2241
>
>
> Repository: kafka
>
>
> Description
> -------
>
> KAFKA-2241; AbstractFetcherThread.shutdown() should not block on ReadableByteChannel.read(buffer)
>
>
> Diffs
> -----
>
> core/src/main/scala/kafka/consumer/SimpleConsumer.scala 31a2639477bf66f9a05d2b9b07794572d7ec393b
> core/src/main/scala/kafka/server/AbstractFetcherThread.scala 83fc47417dd7fe4edf030217fa7fd69d99b170b0
>
> Diff: https://reviews.apache.org/r/34965/diff/
>
>
> Testing
> -------
>
>
> Thanks,
>
> Dong Lin
>
>
Re: Review Request 34965: Patch for KAFKA-2241
Posted by Aditya Auradkar <aa...@linkedin.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34965/#review86460
-----------------------------------------------------------
This may be hard to do in a unit test, but can you check if it's feasible to write a test case?
core/src/main/scala/kafka/consumer/SimpleConsumer.scala
<https://reviews.apache.org/r/34965/#comment138488>
Why not simply modify the close method to disconnect outside the synchronized block? Not that I feel very strongly, I'm curious.
core/src/main/scala/kafka/consumer/SimpleConsumer.scala
<https://reviews.apache.org/r/34965/#comment138486>
I think this needs to be volatile or AtomicBoolean
- Aditya Auradkar
On June 2, 2015, 11:54 p.m., Dong Lin wrote:
>
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/34965/
> -----------------------------------------------------------
>
> (Updated June 2, 2015, 11:54 p.m.)
>
>
> Review request for kafka.
>
>
> Bugs: KAFKA-2241
> https://issues.apache.org/jira/browse/KAFKA-2241
>
>
> Repository: kafka
>
>
> Description
> -------
>
> KAFKA-2241; AbstractFetcherThread.shutdown() should not block on ReadableByteChannel.read(buffer)
>
>
> Diffs
> -----
>
> core/src/main/scala/kafka/consumer/SimpleConsumer.scala 31a2639477bf66f9a05d2b9b07794572d7ec393b
> core/src/main/scala/kafka/server/AbstractFetcherThread.scala 83fc47417dd7fe4edf030217fa7fd69d99b170b0
>
> Diff: https://reviews.apache.org/r/34965/diff/
>
>
> Testing
> -------
>
>
> Thanks,
>
> Dong Lin
>
>
Re: Review Request 34965: Patch for KAFKA-2241
Posted by Dong Lin <li...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34965/
-----------------------------------------------------------
(Updated July 13, 2015, 9:52 p.m.)
Review request for kafka.
Bugs: KAFKA-2241
https://issues.apache.org/jira/browse/KAFKA-2241
Repository: kafka
Description
-------
KAFKA-2241; AbstractFetcherThread.shutdown() should not block
Diffs (updated)
-----
core/src/main/scala/kafka/consumer/SimpleConsumer.scala c16f7edd322709060e54c77eb505c44cbd77a4ec
core/src/main/scala/kafka/server/AbstractFetcherThread.scala 83fc47417dd7fe4edf030217fa7fd69d99b170b0
Diff: https://reviews.apache.org/r/34965/diff/
Testing
-------
Thanks,
Dong Lin
Re: Review Request 34965: Patch for KAFKA-2241
Posted by Dong Lin <li...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34965/
-----------------------------------------------------------
(Updated July 13, 2015, 8:30 p.m.)
Review request for kafka.
Bugs: KAFKA-2241
https://issues.apache.org/jira/browse/KAFKA-2241
Repository: kafka
Description (updated)
-------
KAFKA-2241; AbstractFetcherThread.shutdown() should not block
Diffs (updated)
-----
core/src/main/scala/kafka/consumer/SimpleConsumer.scala c16f7edd322709060e54c77eb505c44cbd77a4ec
core/src/main/scala/kafka/server/AbstractFetcherThread.scala 83fc47417dd7fe4edf030217fa7fd69d99b170b0
Diff: https://reviews.apache.org/r/34965/diff/
Testing
-------
Thanks,
Dong Lin
Re: Review Request 34965: Patch for KAFKA-2241
Posted by Dong Lin <li...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34965/
-----------------------------------------------------------
(Updated July 9, 2015, 10:35 p.m.)
Review request for kafka.
Bugs: KAFKA-2241
https://issues.apache.org/jira/browse/KAFKA-2241
Repository: kafka
Description
-------
KAFKA-2241; AbstractFetcherThread.shutdown() should not block on ReadableByteChannel.read(buffer)
Diffs (updated)
-----
core/src/main/scala/kafka/consumer/SimpleConsumer.scala c16f7edd322709060e54c77eb505c44cbd77a4ec
core/src/main/scala/kafka/server/AbstractFetcherThread.scala 83fc47417dd7fe4edf030217fa7fd69d99b170b0
Diff: https://reviews.apache.org/r/34965/diff/
Testing
-------
Thanks,
Dong Lin
Re: Review Request 34965: Patch for KAFKA-2241
Posted by Joel Koshy <jj...@gmail.com>.
> On July 9, 2015, 7:19 p.m., Joel Koshy wrote:
> > core/src/main/scala/kafka/server/AbstractFetcherThread.scala, line 76
> > <https://reviews.apache.org/r/34965/diff/2/?file=977751#file977751line76>
> >
> > You could get around the above by retaining this call to simpleConsumer.close (although it would be mostly redundant). However this is still not ideal, since it is a caveat that the user of the (public) forceClose API needs to be aware of.
>
> Dong Lin wrote:
> I agree. I have updated the code and comments to hopefully avoid any confusion to user.
>
> Joel Koshy wrote:
> Would it work to just modify what you had before in `forceClose` to:
> ```
> disconnect();
> close();
> ```
>
> Dong Lin wrote:
> I think that won't work. The event sequence you described will still cause problem.
>
> The following sequence of events may happen:
>
> - the forceClose() as well as close() is executed by thread 1
> - thread 2 calls sendRequest(). blockingChannel.send(request) will throw ClosedChannelException which triggers reconnect().
>
> It is possible to make this work by changing the way sendRequest() handles ClosedChannelException. But I find the API in the second patch is better.
>
> Which solution do you prefer?
True - that won't work. Another option may be to change `connect` to throw a `afkaException` if `isClosed` is true. Your latest patch may be better though since that avoids modification of the existing API (and only adds to it) - although I think naming it `interruptConsumer` may be better. The javadoc can clearly state that it actually disconnects the consumer due to the JVM bug (and link to the stackoverflow question).
- Joel
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34965/#review91159
-----------------------------------------------------------
On July 9, 2015, 10:35 p.m., Dong Lin wrote:
>
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/34965/
> -----------------------------------------------------------
>
> (Updated July 9, 2015, 10:35 p.m.)
>
>
> Review request for kafka.
>
>
> Bugs: KAFKA-2241
> https://issues.apache.org/jira/browse/KAFKA-2241
>
>
> Repository: kafka
>
>
> Description
> -------
>
> KAFKA-2241; AbstractFetcherThread.shutdown() should not block on ReadableByteChannel.read(buffer)
>
>
> Diffs
> -----
>
> core/src/main/scala/kafka/consumer/SimpleConsumer.scala c16f7edd322709060e54c77eb505c44cbd77a4ec
> core/src/main/scala/kafka/server/AbstractFetcherThread.scala 83fc47417dd7fe4edf030217fa7fd69d99b170b0
>
> Diff: https://reviews.apache.org/r/34965/diff/
>
>
> Testing
> -------
>
>
> Thanks,
>
> Dong Lin
>
>
Re: Review Request 34965: Patch for KAFKA-2241
Posted by Dong Lin <li...@gmail.com>.
> On July 9, 2015, 7:19 p.m., Joel Koshy wrote:
> > core/src/main/scala/kafka/server/AbstractFetcherThread.scala, line 76
> > <https://reviews.apache.org/r/34965/diff/2/?file=977751#file977751line76>
> >
> > You could get around the above by retaining this call to simpleConsumer.close (although it would be mostly redundant). However this is still not ideal, since it is a caveat that the user of the (public) forceClose API needs to be aware of.
>
> Dong Lin wrote:
> I agree. I have updated the code and comments to hopefully avoid any confusion to user.
>
> Joel Koshy wrote:
> Would it work to just modify what you had before in `forceClose` to:
> ```
> disconnect();
> close();
> ```
I think that won't work. The event sequence you described will still cause problem.
The following sequence of events may happen:
- the forceClose() as well as close() is executed by thread 1
- thread 2 calls sendRequest(). blockingChannel.send(request) will throw ClosedChannelException which triggers reconnect().
It is possible to make this work by changing the way sendRequest() handles ClosedChannelException. But I find the API in the second patch is better.
Which solution do you prefer?
- Dong
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34965/#review91159
-----------------------------------------------------------
On July 9, 2015, 10:35 p.m., Dong Lin wrote:
>
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/34965/
> -----------------------------------------------------------
>
> (Updated July 9, 2015, 10:35 p.m.)
>
>
> Review request for kafka.
>
>
> Bugs: KAFKA-2241
> https://issues.apache.org/jira/browse/KAFKA-2241
>
>
> Repository: kafka
>
>
> Description
> -------
>
> KAFKA-2241; AbstractFetcherThread.shutdown() should not block on ReadableByteChannel.read(buffer)
>
>
> Diffs
> -----
>
> core/src/main/scala/kafka/consumer/SimpleConsumer.scala c16f7edd322709060e54c77eb505c44cbd77a4ec
> core/src/main/scala/kafka/server/AbstractFetcherThread.scala 83fc47417dd7fe4edf030217fa7fd69d99b170b0
>
> Diff: https://reviews.apache.org/r/34965/diff/
>
>
> Testing
> -------
>
>
> Thanks,
>
> Dong Lin
>
>
Re: Review Request 34965: Patch for KAFKA-2241
Posted by Dong Lin <li...@gmail.com>.
> On July 9, 2015, 7:19 p.m., Joel Koshy wrote:
> > core/src/main/scala/kafka/server/AbstractFetcherThread.scala, line 71
> > <https://reviews.apache.org/r/34965/diff/2/?file=977751#file977751line71>
> >
> > As you probably noticed synchronization in the AbstractFetcherManager/Thread classes are unfortunately pretty nightmarish. Since the simple consumer is force-closed without the SimpleConsumer’s lock consider the following sequence:
> > - You call forceClose
> > - In the mean time (before isClosed is set to true) an ongoing call to sendRequest recreates the connection
> > - The fetcher thread will subsequently exit (since the ShutdownableThread’s isRunning flag is false)
> > - So even though the SimpleConsumer is _closed_ at that point, the connection will remain
> >
> > Can you verify or is it a non-issue?
Thanks for the catch! Yes this is an issue. After looking through the code carefully I think we have to keep the simpleConsumer.close() to avoid this problem.
I have also changed the function name and added comments to document the use of the new API.
> On July 9, 2015, 7:19 p.m., Joel Koshy wrote:
> > core/src/main/scala/kafka/server/AbstractFetcherThread.scala, line 76
> > <https://reviews.apache.org/r/34965/diff/2/?file=977751#file977751line76>
> >
> > You could get around the above by retaining this call to simpleConsumer.close (although it would be mostly redundant). However this is still not ideal, since it is a caveat that the user of the (public) forceClose API needs to be aware of.
I agree. I have updated the code and comments to hopefully avoid any confusion to user.
- Dong
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34965/#review91159
-----------------------------------------------------------
On July 9, 2015, 10:35 p.m., Dong Lin wrote:
>
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/34965/
> -----------------------------------------------------------
>
> (Updated July 9, 2015, 10:35 p.m.)
>
>
> Review request for kafka.
>
>
> Bugs: KAFKA-2241
> https://issues.apache.org/jira/browse/KAFKA-2241
>
>
> Repository: kafka
>
>
> Description
> -------
>
> KAFKA-2241; AbstractFetcherThread.shutdown() should not block on ReadableByteChannel.read(buffer)
>
>
> Diffs
> -----
>
> core/src/main/scala/kafka/consumer/SimpleConsumer.scala c16f7edd322709060e54c77eb505c44cbd77a4ec
> core/src/main/scala/kafka/server/AbstractFetcherThread.scala 83fc47417dd7fe4edf030217fa7fd69d99b170b0
>
> Diff: https://reviews.apache.org/r/34965/diff/
>
>
> Testing
> -------
>
>
> Thanks,
>
> Dong Lin
>
>
Re: Review Request 34965: Patch for KAFKA-2241
Posted by Joel Koshy <jj...@gmail.com>.
> On July 9, 2015, 7:19 p.m., Joel Koshy wrote:
> > core/src/main/scala/kafka/server/AbstractFetcherThread.scala, line 76
> > <https://reviews.apache.org/r/34965/diff/2/?file=977751#file977751line76>
> >
> > You could get around the above by retaining this call to simpleConsumer.close (although it would be mostly redundant). However this is still not ideal, since it is a caveat that the user of the (public) forceClose API needs to be aware of.
>
> Dong Lin wrote:
> I agree. I have updated the code and comments to hopefully avoid any confusion to user.
Would it work to just modify what you had before in `forceClose` to:
```
disconnect();
close();
```
- Joel
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34965/#review91159
-----------------------------------------------------------
On July 9, 2015, 10:35 p.m., Dong Lin wrote:
>
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/34965/
> -----------------------------------------------------------
>
> (Updated July 9, 2015, 10:35 p.m.)
>
>
> Review request for kafka.
>
>
> Bugs: KAFKA-2241
> https://issues.apache.org/jira/browse/KAFKA-2241
>
>
> Repository: kafka
>
>
> Description
> -------
>
> KAFKA-2241; AbstractFetcherThread.shutdown() should not block on ReadableByteChannel.read(buffer)
>
>
> Diffs
> -----
>
> core/src/main/scala/kafka/consumer/SimpleConsumer.scala c16f7edd322709060e54c77eb505c44cbd77a4ec
> core/src/main/scala/kafka/server/AbstractFetcherThread.scala 83fc47417dd7fe4edf030217fa7fd69d99b170b0
>
> Diff: https://reviews.apache.org/r/34965/diff/
>
>
> Testing
> -------
>
>
> Thanks,
>
> Dong Lin
>
>
Re: Review Request 34965: Patch for KAFKA-2241
Posted by Joel Koshy <jj...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34965/#review91159
-----------------------------------------------------------
core/src/main/scala/kafka/server/AbstractFetcherThread.scala (line 71)
<https://reviews.apache.org/r/34965/#comment144446>
As you probably noticed synchronization in the AbstractFetcherManager/Thread classes are unfortunately pretty nightmarish. Since the simple consumer is force-closed without the SimpleConsumer’s lock consider the following sequence:
- You call forceClose
- In the mean time (before isClosed is set to true) an ongoing call to sendRequest recreates the connection
- The fetcher thread will subsequently exit (since the ShutdownableThread’s isRunning flag is false)
- So even though the SimpleConsumer is _closed_ at that point, the connection will remain
Can you verify or is it a non-issue?
core/src/main/scala/kafka/server/AbstractFetcherThread.scala
<https://reviews.apache.org/r/34965/#comment144448>
You could get around the above by retaining this call to simpleConsumer.close (although it would be mostly redundant). However this is still not ideal, since it is a caveat that the user of the (public) forceClose API needs to be aware of.
- Joel Koshy
On June 3, 2015, 10:30 p.m., Dong Lin wrote:
>
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/34965/
> -----------------------------------------------------------
>
> (Updated June 3, 2015, 10:30 p.m.)
>
>
> Review request for kafka.
>
>
> Bugs: KAFKA-2241
> https://issues.apache.org/jira/browse/KAFKA-2241
>
>
> Repository: kafka
>
>
> Description
> -------
>
> KAFKA-2241; AbstractFetcherThread.shutdown() should not block on ReadableByteChannel.read(buffer)
>
>
> Diffs
> -----
>
> core/src/main/scala/kafka/consumer/SimpleConsumer.scala 31a2639477bf66f9a05d2b9b07794572d7ec393b
> core/src/main/scala/kafka/server/AbstractFetcherThread.scala 83fc47417dd7fe4edf030217fa7fd69d99b170b0
>
> Diff: https://reviews.apache.org/r/34965/diff/
>
>
> Testing
> -------
>
>
> Thanks,
>
> Dong Lin
>
>
Re: Review Request 34965: Patch for KAFKA-2241
Posted by Dong Lin <li...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34965/
-----------------------------------------------------------
(Updated June 3, 2015, 10:30 p.m.)
Review request for kafka.
Bugs: KAFKA-2241
https://issues.apache.org/jira/browse/KAFKA-2241
Repository: kafka
Description
-------
KAFKA-2241; AbstractFetcherThread.shutdown() should not block on ReadableByteChannel.read(buffer)
Diffs (updated)
-----
core/src/main/scala/kafka/consumer/SimpleConsumer.scala 31a2639477bf66f9a05d2b9b07794572d7ec393b
core/src/main/scala/kafka/server/AbstractFetcherThread.scala 83fc47417dd7fe4edf030217fa7fd69d99b170b0
Diff: https://reviews.apache.org/r/34965/diff/
Testing
-------
Thanks,
Dong Lin