You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Andrii Biletskyi <an...@stealth.ly> on 2015/03/16 16:50:28 UTC

Question about concurrency during Log config change

Hi all,

I was looking through the code related to "dynamic Log config change"
feature and
noticed the way we deal with concurrency there. I have a question about it.

The Log class holds volatile LogConfig property, almost all methods in
Log.scala are
synchronized on private lock object. But the code in TopicConfigManager
(
https://github.com/apache/kafka/blob/7130da90a9ee9e6fb4beb2a2a6ab05c06c9bfac4/core/src/main/scala/kafka/server/TopicConfigManager.scala#L108
)
which "substitutes" Log's logConfig is not synchronized.

Code execution example:
Thread 1: Log.append -> Log:288 config.*maxMessageSize* is accessed
https://github.com/apache/kafka/blob/7130da90a9ee9e6fb4beb2a2a6ab05c06c9bfac4/core/src/main/scala/kafka/log/Log.scala#L288

Thread 2: handles log config change -> TopicConfigManager:108 (see above)
substitutes
log's config - changes *maxMessageSize* and *segmentSize*

Thread 1: Log.append Log:299 - code accesses config.*segmentSize* and
pickups updated
config setting
https://github.com/apache/kafka/blob/7130da90a9ee9e6fb4beb2a2a6ab05c06c9bfac4/core/src/main/scala/kafka/log/Log.scala#L299

So looks like we accessed object in partial "state" - in scope of one
procedure
(Log.append) we took one setting from the old state (maxMessageSize), and
the other
one from the updated state.

Methods in Log are synchronized, as mentioned above. But logConfig is only
volatile
which solves visibility problems but doesn't prevent it from being changed
in other
thread, as I understand.

Am I missing something here?

Thanks,
Andrii Biletskyi

Re: Question about concurrency during Log config change

Posted by Andrii Biletskyi <an...@stealth.ly>.
Jay,

Thanks for quick response. Yes, this might be not that harmful for users,
I'm not sure about that. But it definitely looks like data race. Your
solution
is simple and should work, hard to tell promptly when it's about
concurrency.

Initially I was looking through this code to understand whether we can
inherit
this approach for Global Brokers Config. In this case your solution will be
harder
to implement since we access broker's config in many-many different places.
But that's another story.

Thanks,
Andrii Biletskyi

On Mon, Mar 16, 2015 at 5:56 PM, Jay Kreps <ja...@gmail.com> wrote:

> You are correct. Each read will be a valid value but there is no guarantee
> that subsequent reads will read from the same config. I don't think that is
> a problem, do you? If we want to strengthen the guarantee we can grab the
> config once in the method
>    val config = log.config
> and then do however many accesses against that variable which will remain
> constant even if the config is updated in the course of the method.
>
> -Jay
>
> On Mon, Mar 16, 2015 at 8:50 AM, Andrii Biletskyi <
> andrii.biletskyi@stealth.ly> wrote:
>
> > Hi all,
> >
> > I was looking through the code related to "dynamic Log config change"
> > feature and
> > noticed the way we deal with concurrency there. I have a question about
> it.
> >
> > The Log class holds volatile LogConfig property, almost all methods in
> > Log.scala are
> > synchronized on private lock object. But the code in TopicConfigManager
> > (
> >
> >
> https://github.com/apache/kafka/blob/7130da90a9ee9e6fb4beb2a2a6ab05c06c9bfac4/core/src/main/scala/kafka/server/TopicConfigManager.scala#L108
> > )
> > which "substitutes" Log's logConfig is not synchronized.
> >
> > Code execution example:
> > Thread 1: Log.append -> Log:288 config.*maxMessageSize* is accessed
> >
> >
> https://github.com/apache/kafka/blob/7130da90a9ee9e6fb4beb2a2a6ab05c06c9bfac4/core/src/main/scala/kafka/log/Log.scala#L288
> >
> > Thread 2: handles log config change -> TopicConfigManager:108 (see above)
> > substitutes
> > log's config - changes *maxMessageSize* and *segmentSize*
> >
> > Thread 1: Log.append Log:299 - code accesses config.*segmentSize* and
> > pickups updated
> > config setting
> >
> >
> https://github.com/apache/kafka/blob/7130da90a9ee9e6fb4beb2a2a6ab05c06c9bfac4/core/src/main/scala/kafka/log/Log.scala#L299
> >
> > So looks like we accessed object in partial "state" - in scope of one
> > procedure
> > (Log.append) we took one setting from the old state (maxMessageSize), and
> > the other
> > one from the updated state.
> >
> > Methods in Log are synchronized, as mentioned above. But logConfig is
> only
> > volatile
> > which solves visibility problems but doesn't prevent it from being
> changed
> > in other
> > thread, as I understand.
> >
> > Am I missing something here?
> >
> > Thanks,
> > Andrii Biletskyi
> >
>

Re: Question about concurrency during Log config change

Posted by Jay Kreps <ja...@gmail.com>.
You are correct. Each read will be a valid value but there is no guarantee
that subsequent reads will read from the same config. I don't think that is
a problem, do you? If we want to strengthen the guarantee we can grab the
config once in the method
   val config = log.config
and then do however many accesses against that variable which will remain
constant even if the config is updated in the course of the method.

-Jay

On Mon, Mar 16, 2015 at 8:50 AM, Andrii Biletskyi <
andrii.biletskyi@stealth.ly> wrote:

> Hi all,
>
> I was looking through the code related to "dynamic Log config change"
> feature and
> noticed the way we deal with concurrency there. I have a question about it.
>
> The Log class holds volatile LogConfig property, almost all methods in
> Log.scala are
> synchronized on private lock object. But the code in TopicConfigManager
> (
>
> https://github.com/apache/kafka/blob/7130da90a9ee9e6fb4beb2a2a6ab05c06c9bfac4/core/src/main/scala/kafka/server/TopicConfigManager.scala#L108
> )
> which "substitutes" Log's logConfig is not synchronized.
>
> Code execution example:
> Thread 1: Log.append -> Log:288 config.*maxMessageSize* is accessed
>
> https://github.com/apache/kafka/blob/7130da90a9ee9e6fb4beb2a2a6ab05c06c9bfac4/core/src/main/scala/kafka/log/Log.scala#L288
>
> Thread 2: handles log config change -> TopicConfigManager:108 (see above)
> substitutes
> log's config - changes *maxMessageSize* and *segmentSize*
>
> Thread 1: Log.append Log:299 - code accesses config.*segmentSize* and
> pickups updated
> config setting
>
> https://github.com/apache/kafka/blob/7130da90a9ee9e6fb4beb2a2a6ab05c06c9bfac4/core/src/main/scala/kafka/log/Log.scala#L299
>
> So looks like we accessed object in partial "state" - in scope of one
> procedure
> (Log.append) we took one setting from the old state (maxMessageSize), and
> the other
> one from the updated state.
>
> Methods in Log are synchronized, as mentioned above. But logConfig is only
> volatile
> which solves visibility problems but doesn't prevent it from being changed
> in other
> thread, as I understand.
>
> Am I missing something here?
>
> Thanks,
> Andrii Biletskyi
>