You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Chittaranjan Hota <ch...@gmail.com> on 2015/01/21 22:58:29 UTC

NIO and Threading implementation

Hello,
Congratulations to the folks behind kafka. Its has been a smooth ride
dealing with multi TB data when the same set up in JMS fell apart often.

Although I have been using kafka for more than a few days now, started
looking into the code base since yesterday and already have doubts at the
very beginning. Would need some inputs on why the implementation is done
the way it is.

Version : 0.8.1

THREADING RELATED
1. Why in the start up code synchronized? Who are the competing threads?
    a. startReporters func is synchronized
    b. KafkaScheduler startup is synchronized? There is also a volatile
variable declared when the whole synchronized block is itself guaranteeing
"happens before".
   c. Use of native new Thread syntax instead of relying on Executor service
   d. processor thread uses a couthdownlatch but main thread doesnt await
for processors to signal that startup is complete.


NIO RELATED
2.
   a. Acceptor, and each Processor thread have their own selector (since
they are extending from abstract class AbstractServerThread). Ideally a
single selector suffices multiplexing. Is there any reason why multiple
selectors are used?
   b. selector wake up calls by Processors in the read method (line 362
SocketServer.scala) are MISSED calls since there is no thread waiting on
the select at that point.

Looking forward to learning the code further!
Thanks in advance.

Regards,
Chitta

Re: NIO and Threading implementation

Posted by Jay Kreps <ja...@gmail.com>.
Hey Chittaranjan,

Yeah I think at a high level our goal is that classes are either threadsafe
or not and if threadsafe their safety doesn't depend on the details of
their current usage(s) since that often changes. In other words the
synchronization should be encapsulated in the class. So that's the goal.
I've seen often over time this strays as code gets refactored. If you see
issues definitely send a patch. :-)

For the socket server code, yes if that works let's definitely remove the
unneeded wakeups. It would be interesting to trace back the ancestry of
that code and figure out how it ended up like that (may have always been
that way).

For the selectors what I was saying is this. We need to have multiple
threads doing network I/O. This helps scale that cpu load and also since we
use sendfile which is blocking we need to have threads to absorb that load.
So we need multiple I/O threads. Two ways I know to do this: (1) give each
thread a selector and register accepted sockets with one of these (at
random or round-robin), or (2) have a single selector and attempt to share
it between threads. Theoretically (2) could have advantages because if one
thread is blocked others will pick up it's work. However there are a ton of
issues in making this correct and efficient. So basically I was advocating
(1) as being better than (2).

-Jay

On Wed, Jan 21, 2015 at 10:46 PM, Chittaranjan Hota <ch...@gmail.com>
wrote:

> Thanks for your comments Jay.
>
> Quote "Technically startup is not called from
> multiple threads but the classes correctness should not depended on the
> current usage so it should work correctly if it were." --> If this were a
> requirement then one can see that many methods are NOT thread safe while
> the start up happens. If we need to stick to the goal of exposing kafka
> initialization by other Parents, few things have to change. Nevertheless am
> currently doing some changes on my local copy and once I see how things
> look will sync back with you.
>
> For the other couple of things (removed wake up and also added awaits
> correctly) i have done the changes locally and deployed to our stage
> cluster (3 brokers, 3 zk nodes) and did some load tests today.
>
> Not sure if i understood what "single threaded selector loop" means and
> also the locking in selector loops, I would love to have a conversation
> with you around this.
>
> Thanks again  ......
>
>
>
>
> On Wed, Jan 21, 2015 at 2:15 PM, Jay Kreps <ja...@gmail.com> wrote:
>
> > 1. a. I think startup is a public method on KafkaServer so for people
> > embedding Kafka in some way this helps guarantee correctness.
> > b. I think KafkaScheduler tries to be a bit too clever, there is a patch
> > out there that just moves to global synchronization for the whole class
> > which is easier to reason about. Technically startup is not called from
> > multiple threads but the classes correctness should not depended on the
> > current usage so it should work correctly if it were.
> > c. I think in cases where you actually just want to start and run N
> > threads, using Thread directly is sensible. ExecutorService is useful but
> > does have a ton of gadgets and gizmos that obscure the basic usage in
> that
> > case.
> > d. Yeah we should probably wait until the processor threads start as
> well.
> > I think it probably doesn't cause misbehavior as is, but it would be
> better
> > if the postcondition of startup was that all threads had started.
> >
> > 2. a. There are different ways to do this. My overwhelming experience has
> > been that any attempt to share a selector across threads is very painful.
> > Making the selector loops single threaded just really really simplifies
> > everything, but also the performance tends to be a lot better because
> there
> > is far less locking inside that selector loop.
> > b. Yeah I share you skepticism of that call. I'm not sure why it is there
> > or if it is needed. I agree that wakeup should only be needed from other
> > threads. It would be good to untangle that mystery. I wonder what happens
> > if it is removed.
> >
> > -Jay
> >
> > On Wed, Jan 21, 2015 at 1:58 PM, Chittaranjan Hota <
> chitts.hota@gmail.com>
> > wrote:
> >
> > > Hello,
> > > Congratulations to the folks behind kafka. Its has been a smooth ride
> > > dealing with multi TB data when the same set up in JMS fell apart
> often.
> > >
> > > Although I have been using kafka for more than a few days now, started
> > > looking into the code base since yesterday and already have doubts at
> the
> > > very beginning. Would need some inputs on why the implementation is
> done
> > > the way it is.
> > >
> > > Version : 0.8.1
> > >
> > > THREADING RELATED
> > > 1. Why in the start up code synchronized? Who are the competing
> threads?
> > >     a. startReporters func is synchronized
> > >     b. KafkaScheduler startup is synchronized? There is also a volatile
> > > variable declared when the whole synchronized block is itself
> > guaranteeing
> > > "happens before".
> > >    c. Use of native new Thread syntax instead of relying on Executor
> > > service
> > >    d. processor thread uses a couthdownlatch but main thread doesnt
> await
> > > for processors to signal that startup is complete.
> > >
> > >
> > > NIO RELATED
> > > 2.
> > >    a. Acceptor, and each Processor thread have their own selector
> (since
> > > they are extending from abstract class AbstractServerThread). Ideally a
> > > single selector suffices multiplexing. Is there any reason why multiple
> > > selectors are used?
> > >    b. selector wake up calls by Processors in the read method (line 362
> > > SocketServer.scala) are MISSED calls since there is no thread waiting
> on
> > > the select at that point.
> > >
> > > Looking forward to learning the code further!
> > > Thanks in advance.
> > >
> > > Regards,
> > > Chitta
> > >
> >
>

Re: NIO and Threading implementation

Posted by Chittaranjan Hota <ch...@gmail.com>.
Thanks for your comments Jay.

Quote "Technically startup is not called from
multiple threads but the classes correctness should not depended on the
current usage so it should work correctly if it were." --> If this were a
requirement then one can see that many methods are NOT thread safe while
the start up happens. If we need to stick to the goal of exposing kafka
initialization by other Parents, few things have to change. Nevertheless am
currently doing some changes on my local copy and once I see how things
look will sync back with you.

For the other couple of things (removed wake up and also added awaits
correctly) i have done the changes locally and deployed to our stage
cluster (3 brokers, 3 zk nodes) and did some load tests today.

Not sure if i understood what "single threaded selector loop" means and
also the locking in selector loops, I would love to have a conversation
with you around this.

Thanks again  ......




On Wed, Jan 21, 2015 at 2:15 PM, Jay Kreps <ja...@gmail.com> wrote:

> 1. a. I think startup is a public method on KafkaServer so for people
> embedding Kafka in some way this helps guarantee correctness.
> b. I think KafkaScheduler tries to be a bit too clever, there is a patch
> out there that just moves to global synchronization for the whole class
> which is easier to reason about. Technically startup is not called from
> multiple threads but the classes correctness should not depended on the
> current usage so it should work correctly if it were.
> c. I think in cases where you actually just want to start and run N
> threads, using Thread directly is sensible. ExecutorService is useful but
> does have a ton of gadgets and gizmos that obscure the basic usage in that
> case.
> d. Yeah we should probably wait until the processor threads start as well.
> I think it probably doesn't cause misbehavior as is, but it would be better
> if the postcondition of startup was that all threads had started.
>
> 2. a. There are different ways to do this. My overwhelming experience has
> been that any attempt to share a selector across threads is very painful.
> Making the selector loops single threaded just really really simplifies
> everything, but also the performance tends to be a lot better because there
> is far less locking inside that selector loop.
> b. Yeah I share you skepticism of that call. I'm not sure why it is there
> or if it is needed. I agree that wakeup should only be needed from other
> threads. It would be good to untangle that mystery. I wonder what happens
> if it is removed.
>
> -Jay
>
> On Wed, Jan 21, 2015 at 1:58 PM, Chittaranjan Hota <ch...@gmail.com>
> wrote:
>
> > Hello,
> > Congratulations to the folks behind kafka. Its has been a smooth ride
> > dealing with multi TB data when the same set up in JMS fell apart often.
> >
> > Although I have been using kafka for more than a few days now, started
> > looking into the code base since yesterday and already have doubts at the
> > very beginning. Would need some inputs on why the implementation is done
> > the way it is.
> >
> > Version : 0.8.1
> >
> > THREADING RELATED
> > 1. Why in the start up code synchronized? Who are the competing threads?
> >     a. startReporters func is synchronized
> >     b. KafkaScheduler startup is synchronized? There is also a volatile
> > variable declared when the whole synchronized block is itself
> guaranteeing
> > "happens before".
> >    c. Use of native new Thread syntax instead of relying on Executor
> > service
> >    d. processor thread uses a couthdownlatch but main thread doesnt await
> > for processors to signal that startup is complete.
> >
> >
> > NIO RELATED
> > 2.
> >    a. Acceptor, and each Processor thread have their own selector (since
> > they are extending from abstract class AbstractServerThread). Ideally a
> > single selector suffices multiplexing. Is there any reason why multiple
> > selectors are used?
> >    b. selector wake up calls by Processors in the read method (line 362
> > SocketServer.scala) are MISSED calls since there is no thread waiting on
> > the select at that point.
> >
> > Looking forward to learning the code further!
> > Thanks in advance.
> >
> > Regards,
> > Chitta
> >
>

Re: NIO and Threading implementation

Posted by Jay Kreps <ja...@gmail.com>.
1. a. I think startup is a public method on KafkaServer so for people
embedding Kafka in some way this helps guarantee correctness.
b. I think KafkaScheduler tries to be a bit too clever, there is a patch
out there that just moves to global synchronization for the whole class
which is easier to reason about. Technically startup is not called from
multiple threads but the classes correctness should not depended on the
current usage so it should work correctly if it were.
c. I think in cases where you actually just want to start and run N
threads, using Thread directly is sensible. ExecutorService is useful but
does have a ton of gadgets and gizmos that obscure the basic usage in that
case.
d. Yeah we should probably wait until the processor threads start as well.
I think it probably doesn't cause misbehavior as is, but it would be better
if the postcondition of startup was that all threads had started.

2. a. There are different ways to do this. My overwhelming experience has
been that any attempt to share a selector across threads is very painful.
Making the selector loops single threaded just really really simplifies
everything, but also the performance tends to be a lot better because there
is far less locking inside that selector loop.
b. Yeah I share you skepticism of that call. I'm not sure why it is there
or if it is needed. I agree that wakeup should only be needed from other
threads. It would be good to untangle that mystery. I wonder what happens
if it is removed.

-Jay

On Wed, Jan 21, 2015 at 1:58 PM, Chittaranjan Hota <ch...@gmail.com>
wrote:

> Hello,
> Congratulations to the folks behind kafka. Its has been a smooth ride
> dealing with multi TB data when the same set up in JMS fell apart often.
>
> Although I have been using kafka for more than a few days now, started
> looking into the code base since yesterday and already have doubts at the
> very beginning. Would need some inputs on why the implementation is done
> the way it is.
>
> Version : 0.8.1
>
> THREADING RELATED
> 1. Why in the start up code synchronized? Who are the competing threads?
>     a. startReporters func is synchronized
>     b. KafkaScheduler startup is synchronized? There is also a volatile
> variable declared when the whole synchronized block is itself guaranteeing
> "happens before".
>    c. Use of native new Thread syntax instead of relying on Executor
> service
>    d. processor thread uses a couthdownlatch but main thread doesnt await
> for processors to signal that startup is complete.
>
>
> NIO RELATED
> 2.
>    a. Acceptor, and each Processor thread have their own selector (since
> they are extending from abstract class AbstractServerThread). Ideally a
> single selector suffices multiplexing. Is there any reason why multiple
> selectors are used?
>    b. selector wake up calls by Processors in the read method (line 362
> SocketServer.scala) are MISSED calls since there is no thread waiting on
> the select at that point.
>
> Looking forward to learning the code further!
> Thanks in advance.
>
> Regards,
> Chitta
>