You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Eric Sammer <es...@scalingdata.com> on 2014/05/09 22:19:33 UTC

New consumer APIs

All:

I've been going over the new consumer APIs and it seems like we're
squishing a lot of different concerns together into a single class. The
scope of the new Consumer is kind of all over the place. Managing the
lifecycle - and especially the thread safety - seems challenging.
Specifically, Consumer seems to serve the following purposes:
* Acts as a holder of subscription info (e.g. subscribe()).
* Acts as a stream (e.g. poll(), seek()).

I definitely think we want these to be separate. It's pretty common to have
a consumer process that connects to the broker, creates N consumer threads,
each of which working on a single stream (which could be composed of some
number of partitions). In this scenario, you *really* want to explicitly
control durability (e.g. commit()s) on a per-stream basis. You also have
different lifecycle semantics and thread safety concerns at the stream
level versus the global level. Is there a reason the API doesn't look more
like:

// Thread safe, owns the multiplexed connection
Consumer:
  def subscribe(topic: String, streams: Int): Set[Stream]
  def close() // Release everything

// Not at all thread safe, no synchronization.
Stream:
  def commit() // Really important this be here and not on Consumer.
  def seek(...)
  def poll(duration: Long, unit: TimeUnit): List[MessageThingie]
  def close() // Release these partitions
  ...

I think this also significantly reduces the complexity of the Consumer API
and lets each thread in a consumer process handle stream lifecycle
appropriately. Since the connection is multiplexed and things could get
rebalanced, just toss an exception if the streams become invalid, forcing a
resubscribe. That way we don't have crazy state logic.

I'm sure I'm missing something, but I wanted to toss this out there for
folks to poke at.
(p.s. I *really* want per-stream commit baked into the API.)
-- 
E. Sammer
CTO - ScalingData

Re: New consumer APIs

Posted by Jay Kreps <ja...@gmail.com>.
Hey Eric,

Yeah this is more similar to what we currently have but with a richer api
then a simple Iterator.

I think the question is how the poll() on the various streams translates
into the ultimate poll that we need to do against the individual socket
connections.

Some of the things that make the current api not ideal are the following:
1. Currently we have fetcher threads that do network I/O and then blocking
queues that feed iterators (the equivalent of the Stream) in your proposal.
However it is a bit unfortunate to have the client dictate a particular
threading model. One thing we liked about the current proposal was that it
is all one thread. It only does I/O when you call poll(). If the user then
wants to feed these into queues to go to thread pools that is fine but not
required. In a sense the client is sort of a lower level building block--I
think there are actually a couple of higher level APIs that could be built
on top of this to (such as the Java 8 Stream stuff or something that
manages a thread pool of processors for you). We thought about some of
these higher-level APIs, and though they are definitely more convenient for
certain uses they are not as general.
2. There are also a ton of gotchas in terms of cleanly re-assigning
partitions. You need some "safe point" at which the user isn't processing
any more. In the current API when the user calls poll() the meaning of that
call is that all previous messages have been processed. Hence you can
commit offsets (if using autocommit) or reassign partitions transparently.
This gets a bit more complex if there are many polls() on different time
windows (though perhaps still possible).
3. We would have to figure out how unioning two streams would work. You
need to have a way of polling over all the streams for things that consume
multiple inputs.

Not sure how much of that makes sense...

-Jay


On Fri, May 9, 2014 at 1:19 PM, Eric Sammer <es...@scalingdata.com> wrote:

> All:
>
> I've been going over the new consumer APIs and it seems like we're
> squishing a lot of different concerns together into a single class. The
> scope of the new Consumer is kind of all over the place. Managing the
> lifecycle - and especially the thread safety - seems challenging.
> Specifically, Consumer seems to serve the following purposes:
> * Acts as a holder of subscription info (e.g. subscribe()).
> * Acts as a stream (e.g. poll(), seek()).
>
> I definitely think we want these to be separate. It's pretty common to have
> a consumer process that connects to the broker, creates N consumer threads,
> each of which working on a single stream (which could be composed of some
> number of partitions). In this scenario, you *really* want to explicitly
> control durability (e.g. commit()s) on a per-stream basis. You also have
> different lifecycle semantics and thread safety concerns at the stream
> level versus the global level. Is there a reason the API doesn't look more
> like:
>
> // Thread safe, owns the multiplexed connection
> Consumer:
>   def subscribe(topic: String, streams: Int): Set[Stream]
>   def close() // Release everything
>
> // Not at all thread safe, no synchronization.
> Stream:
>   def commit() // Really important this be here and not on Consumer.
>   def seek(...)
>   def poll(duration: Long, unit: TimeUnit): List[MessageThingie]
>   def close() // Release these partitions
>   ...
>
> I think this also significantly reduces the complexity of the Consumer API
> and lets each thread in a consumer process handle stream lifecycle
> appropriately. Since the connection is multiplexed and things could get
> rebalanced, just toss an exception if the streams become invalid, forcing a
> resubscribe. That way we don't have crazy state logic.
>
> I'm sure I'm missing something, but I wanted to toss this out there for
> folks to poke at.
> (p.s. I *really* want per-stream commit baked into the API.)
> --
> E. Sammer
> CTO - ScalingData
>

Re: New consumer APIs

Posted by Jun Rao <ju...@gmail.com>.
Eric,

With the new proposal, it seems what you want can be achieved by just
instantiating N Consumer instances. Then, you can wrap each in a thread and
call poll() in it. Will that work for you?

Thanks

Jun


On Fri, May 16, 2014 at 4:05 PM, Eric Sammer <es...@scalingdata.com>wrote:

> Neha:
>
> Here's the basic pseudo code of the process acting as the Kafka consumer:
>
> executor = Executors.newFixedThreadPool(numberOfThreads)
> consumer = // get a handle to the broker.
> mytopicStreams = consumer.getStreams({ "mytopic" => numberOfThreads
> }).get("mytopic")
>
> for (stream : mytopicStreams) {
>   executor.submit(() => {
>     i = 0;
>     for (message : stream) {
>       writeMessageToSomething(message);
>       i++
>       if (i % 1000) {
>         commitSomething()
>         // I understand I can get dupes as a result of this.
>         stream.commit()
>       }
>     }
>   })
> }
>
> You get the idea. I want to be able to indicate that thread N (which has
> stream N which is made up of partitions a..z) is in a reasonable state to
> commit; that the messages it has consumed are "on disk." Specifically, I
> don't want to have to synchronize all threads when I commit, nor do I want
> to enumerate the partitions in each stream to commit them individually.
> From a library perspective, it's pretty obvious that the stream should
> manage the offsets for each underlying partition.
>
>
>
> On Thu, May 15, 2014 at 3:09 PM, Neha Narkhede <neha.narkhede@gmail.com
> >wrote:
>
> > >> (p.s. I *really* want per-stream commit baked into the API.)
> >
> > Assuming that you mean being able to control commit() per partition, then
> > yes. This is included. You can see some code
> > examples<
> >
> http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/org/apache/kafka/clients/consumer/KafkaConsumer.html
> > >here
> > to get a better idea of how that can be used.
> >
> > However, I'd also like to understand your use case better. Are you trying
> > to use the consumer for group management (auto rebalancing) and at the
> same
> > time, control per partition commit()? In general, before proposing any
> > changes, I'd like to understand what you are hoping to achieve with the
> > consumer APIs.
> >
> > Thanks,
> > Neha
> >
> >
> > On Fri, May 9, 2014 at 1:19 PM, Eric Sammer <es...@scalingdata.com>
> > wrote:
> >
> > > All:
> > >
> > > I've been going over the new consumer APIs and it seems like we're
> > > squishing a lot of different concerns together into a single class. The
> > > scope of the new Consumer is kind of all over the place. Managing the
> > > lifecycle - and especially the thread safety - seems challenging.
> > > Specifically, Consumer seems to serve the following purposes:
> > > * Acts as a holder of subscription info (e.g. subscribe()).
> > > * Acts as a stream (e.g. poll(), seek()).
> > >
> > > I definitely think we want these to be separate. It's pretty common to
> > have
> > > a consumer process that connects to the broker, creates N consumer
> > threads,
> > > each of which working on a single stream (which could be composed of
> some
> > > number of partitions). In this scenario, you *really* want to
> explicitly
> > > control durability (e.g. commit()s) on a per-stream basis. You also
> have
> > > different lifecycle semantics and thread safety concerns at the stream
> > > level versus the global level. Is there a reason the API doesn't look
> > more
> > > like:
> > >
> > > // Thread safe, owns the multiplexed connection
> > > Consumer:
> > >   def subscribe(topic: String, streams: Int): Set[Stream]
> > >   def close() // Release everything
> > >
> > > // Not at all thread safe, no synchronization.
> > > Stream:
> > >   def commit() // Really important this be here and not on Consumer.
> > >   def seek(...)
> > >   def poll(duration: Long, unit: TimeUnit): List[MessageThingie]
> > >   def close() // Release these partitions
> > >   ...
> > >
> > > I think this also significantly reduces the complexity of the Consumer
> > API
> > > and lets each thread in a consumer process handle stream lifecycle
> > > appropriately. Since the connection is multiplexed and things could get
> > > rebalanced, just toss an exception if the streams become invalid,
> > forcing a
> > > resubscribe. That way we don't have crazy state logic.
> > >
> > > I'm sure I'm missing something, but I wanted to toss this out there for
> > > folks to poke at.
> > > (p.s. I *really* want per-stream commit baked into the API.)
> > > --
> > > E. Sammer
> > > CTO - ScalingData
> > >
> >
>
>
>
> --
> E. Sammer
> CTO - ScalingData
>

Re: New consumer APIs

Posted by Eric Sammer <es...@scalingdata.com>.
Neha:

Here's the basic pseudo code of the process acting as the Kafka consumer:

executor = Executors.newFixedThreadPool(numberOfThreads)
consumer = // get a handle to the broker.
mytopicStreams = consumer.getStreams({ "mytopic" => numberOfThreads
}).get("mytopic")

for (stream : mytopicStreams) {
  executor.submit(() => {
    i = 0;
    for (message : stream) {
      writeMessageToSomething(message);
      i++
      if (i % 1000) {
        commitSomething()
        // I understand I can get dupes as a result of this.
        stream.commit()
      }
    }
  })
}

You get the idea. I want to be able to indicate that thread N (which has
stream N which is made up of partitions a..z) is in a reasonable state to
commit; that the messages it has consumed are "on disk." Specifically, I
don't want to have to synchronize all threads when I commit, nor do I want
to enumerate the partitions in each stream to commit them individually.
>From a library perspective, it's pretty obvious that the stream should
manage the offsets for each underlying partition.



On Thu, May 15, 2014 at 3:09 PM, Neha Narkhede <ne...@gmail.com>wrote:

> >> (p.s. I *really* want per-stream commit baked into the API.)
>
> Assuming that you mean being able to control commit() per partition, then
> yes. This is included. You can see some code
> examples<
> http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/org/apache/kafka/clients/consumer/KafkaConsumer.html
> >here
> to get a better idea of how that can be used.
>
> However, I'd also like to understand your use case better. Are you trying
> to use the consumer for group management (auto rebalancing) and at the same
> time, control per partition commit()? In general, before proposing any
> changes, I'd like to understand what you are hoping to achieve with the
> consumer APIs.
>
> Thanks,
> Neha
>
>
> On Fri, May 9, 2014 at 1:19 PM, Eric Sammer <es...@scalingdata.com>
> wrote:
>
> > All:
> >
> > I've been going over the new consumer APIs and it seems like we're
> > squishing a lot of different concerns together into a single class. The
> > scope of the new Consumer is kind of all over the place. Managing the
> > lifecycle - and especially the thread safety - seems challenging.
> > Specifically, Consumer seems to serve the following purposes:
> > * Acts as a holder of subscription info (e.g. subscribe()).
> > * Acts as a stream (e.g. poll(), seek()).
> >
> > I definitely think we want these to be separate. It's pretty common to
> have
> > a consumer process that connects to the broker, creates N consumer
> threads,
> > each of which working on a single stream (which could be composed of some
> > number of partitions). In this scenario, you *really* want to explicitly
> > control durability (e.g. commit()s) on a per-stream basis. You also have
> > different lifecycle semantics and thread safety concerns at the stream
> > level versus the global level. Is there a reason the API doesn't look
> more
> > like:
> >
> > // Thread safe, owns the multiplexed connection
> > Consumer:
> >   def subscribe(topic: String, streams: Int): Set[Stream]
> >   def close() // Release everything
> >
> > // Not at all thread safe, no synchronization.
> > Stream:
> >   def commit() // Really important this be here and not on Consumer.
> >   def seek(...)
> >   def poll(duration: Long, unit: TimeUnit): List[MessageThingie]
> >   def close() // Release these partitions
> >   ...
> >
> > I think this also significantly reduces the complexity of the Consumer
> API
> > and lets each thread in a consumer process handle stream lifecycle
> > appropriately. Since the connection is multiplexed and things could get
> > rebalanced, just toss an exception if the streams become invalid,
> forcing a
> > resubscribe. That way we don't have crazy state logic.
> >
> > I'm sure I'm missing something, but I wanted to toss this out there for
> > folks to poke at.
> > (p.s. I *really* want per-stream commit baked into the API.)
> > --
> > E. Sammer
> > CTO - ScalingData
> >
>



-- 
E. Sammer
CTO - ScalingData

Re: New consumer APIs

Posted by Neha Narkhede <ne...@gmail.com>.
>> (p.s. I *really* want per-stream commit baked into the API.)

Assuming that you mean being able to control commit() per partition, then
yes. This is included. You can see some code
examples<http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/org/apache/kafka/clients/consumer/KafkaConsumer.html>here
to get a better idea of how that can be used.

However, I'd also like to understand your use case better. Are you trying
to use the consumer for group management (auto rebalancing) and at the same
time, control per partition commit()? In general, before proposing any
changes, I'd like to understand what you are hoping to achieve with the
consumer APIs.

Thanks,
Neha


On Fri, May 9, 2014 at 1:19 PM, Eric Sammer <es...@scalingdata.com> wrote:

> All:
>
> I've been going over the new consumer APIs and it seems like we're
> squishing a lot of different concerns together into a single class. The
> scope of the new Consumer is kind of all over the place. Managing the
> lifecycle - and especially the thread safety - seems challenging.
> Specifically, Consumer seems to serve the following purposes:
> * Acts as a holder of subscription info (e.g. subscribe()).
> * Acts as a stream (e.g. poll(), seek()).
>
> I definitely think we want these to be separate. It's pretty common to have
> a consumer process that connects to the broker, creates N consumer threads,
> each of which working on a single stream (which could be composed of some
> number of partitions). In this scenario, you *really* want to explicitly
> control durability (e.g. commit()s) on a per-stream basis. You also have
> different lifecycle semantics and thread safety concerns at the stream
> level versus the global level. Is there a reason the API doesn't look more
> like:
>
> // Thread safe, owns the multiplexed connection
> Consumer:
>   def subscribe(topic: String, streams: Int): Set[Stream]
>   def close() // Release everything
>
> // Not at all thread safe, no synchronization.
> Stream:
>   def commit() // Really important this be here and not on Consumer.
>   def seek(...)
>   def poll(duration: Long, unit: TimeUnit): List[MessageThingie]
>   def close() // Release these partitions
>   ...
>
> I think this also significantly reduces the complexity of the Consumer API
> and lets each thread in a consumer process handle stream lifecycle
> appropriately. Since the connection is multiplexed and things could get
> rebalanced, just toss an exception if the streams become invalid, forcing a
> resubscribe. That way we don't have crazy state logic.
>
> I'm sure I'm missing something, but I wanted to toss this out there for
> folks to poke at.
> (p.s. I *really* want per-stream commit baked into the API.)
> --
> E. Sammer
> CTO - ScalingData
>

Re: New consumer APIs

Posted by Timothy Chen <tn...@gmail.com>.
Hi Neha,

Yes a way that allows each partition to be committed seperately.

Couldn't remember if the new consumer allows it, but looks like it does!

Tim

On Fri, May 16, 2014 at 9:37 AM, Neha Narkhede <ne...@gmail.com> wrote:
> Tim,
>
> I'm going to ask you the same question :-)
>
> By "per stream commit", do you mean a per partition commit like this API -
>
> public OffsetMetadata commit(Map<TopicPartition, Long> offsets);
>
> This API allows the consumer to commit the specified offsets only for
> selected partitions.
>
> Thanks,
> Neha
>
>
> On Thu, May 15, 2014 at 8:42 AM, Timothy Chen <tn...@gmail.com> wrote:
>
>> Also going to add that I know a per stream commit is a strong requirement
>> for folks I know using Kafka, and seen custom code done just to do so.
>>
>> Tim
>>
>> > On May 9, 2014, at 1:19 PM, Eric Sammer <es...@scalingdata.com> wrote:
>> >
>> > All:
>> >
>> > I've been going over the new consumer APIs and it seems like we're
>> > squishing a lot of different concerns together into a single class. The
>> > scope of the new Consumer is kind of all over the place. Managing the
>> > lifecycle - and especially the thread safety - seems challenging.
>> > Specifically, Consumer seems to serve the following purposes:
>> > * Acts as a holder of subscription info (e.g. subscribe()).
>> > * Acts as a stream (e.g. poll(), seek()).
>> >
>> > I definitely think we want these to be separate. It's pretty common to
>> have
>> > a consumer process that connects to the broker, creates N consumer
>> threads,
>> > each of which working on a single stream (which could be composed of some
>> > number of partitions). In this scenario, you *really* want to explicitly
>> > control durability (e.g. commit()s) on a per-stream basis. You also have
>> > different lifecycle semantics and thread safety concerns at the stream
>> > level versus the global level. Is there a reason the API doesn't look
>> more
>> > like:
>> >
>> > // Thread safe, owns the multiplexed connection
>> > Consumer:
>> >  def subscribe(topic: String, streams: Int): Set[Stream]
>> >  def close() // Release everything
>> >
>> > // Not at all thread safe, no synchronization.
>> > Stream:
>> >  def commit() // Really important this be here and not on Consumer.
>> >  def seek(...)
>> >  def poll(duration: Long, unit: TimeUnit): List[MessageThingie]
>> >  def close() // Release these partitions
>> >  ...
>> >
>> > I think this also significantly reduces the complexity of the Consumer
>> API
>> > and lets each thread in a consumer process handle stream lifecycle
>> > appropriately. Since the connection is multiplexed and things could get
>> > rebalanced, just toss an exception if the streams become invalid,
>> forcing a
>> > resubscribe. That way we don't have crazy state logic.
>> >
>> > I'm sure I'm missing something, but I wanted to toss this out there for
>> > folks to poke at.
>> > (p.s. I *really* want per-stream commit baked into the API.)
>> > --
>> > E. Sammer
>> > CTO - ScalingData
>>

Re: New consumer APIs

Posted by Neha Narkhede <ne...@gmail.com>.
Tim,

I'm going to ask you the same question :-)

By "per stream commit", do you mean a per partition commit like this API -

public OffsetMetadata commit(Map<TopicPartition, Long> offsets);

This API allows the consumer to commit the specified offsets only for
selected partitions.

Thanks,
Neha


On Thu, May 15, 2014 at 8:42 AM, Timothy Chen <tn...@gmail.com> wrote:

> Also going to add that I know a per stream commit is a strong requirement
> for folks I know using Kafka, and seen custom code done just to do so.
>
> Tim
>
> > On May 9, 2014, at 1:19 PM, Eric Sammer <es...@scalingdata.com> wrote:
> >
> > All:
> >
> > I've been going over the new consumer APIs and it seems like we're
> > squishing a lot of different concerns together into a single class. The
> > scope of the new Consumer is kind of all over the place. Managing the
> > lifecycle - and especially the thread safety - seems challenging.
> > Specifically, Consumer seems to serve the following purposes:
> > * Acts as a holder of subscription info (e.g. subscribe()).
> > * Acts as a stream (e.g. poll(), seek()).
> >
> > I definitely think we want these to be separate. It's pretty common to
> have
> > a consumer process that connects to the broker, creates N consumer
> threads,
> > each of which working on a single stream (which could be composed of some
> > number of partitions). In this scenario, you *really* want to explicitly
> > control durability (e.g. commit()s) on a per-stream basis. You also have
> > different lifecycle semantics and thread safety concerns at the stream
> > level versus the global level. Is there a reason the API doesn't look
> more
> > like:
> >
> > // Thread safe, owns the multiplexed connection
> > Consumer:
> >  def subscribe(topic: String, streams: Int): Set[Stream]
> >  def close() // Release everything
> >
> > // Not at all thread safe, no synchronization.
> > Stream:
> >  def commit() // Really important this be here and not on Consumer.
> >  def seek(...)
> >  def poll(duration: Long, unit: TimeUnit): List[MessageThingie]
> >  def close() // Release these partitions
> >  ...
> >
> > I think this also significantly reduces the complexity of the Consumer
> API
> > and lets each thread in a consumer process handle stream lifecycle
> > appropriately. Since the connection is multiplexed and things could get
> > rebalanced, just toss an exception if the streams become invalid,
> forcing a
> > resubscribe. That way we don't have crazy state logic.
> >
> > I'm sure I'm missing something, but I wanted to toss this out there for
> > folks to poke at.
> > (p.s. I *really* want per-stream commit baked into the API.)
> > --
> > E. Sammer
> > CTO - ScalingData
>

Re: New consumer APIs

Posted by Timothy Chen <tn...@gmail.com>.
Also going to add that I know a per stream commit is a strong requirement for folks I know using Kafka, and seen custom code done just to do so.

Tim

> On May 9, 2014, at 1:19 PM, Eric Sammer <es...@scalingdata.com> wrote:
> 
> All:
> 
> I've been going over the new consumer APIs and it seems like we're
> squishing a lot of different concerns together into a single class. The
> scope of the new Consumer is kind of all over the place. Managing the
> lifecycle - and especially the thread safety - seems challenging.
> Specifically, Consumer seems to serve the following purposes:
> * Acts as a holder of subscription info (e.g. subscribe()).
> * Acts as a stream (e.g. poll(), seek()).
> 
> I definitely think we want these to be separate. It's pretty common to have
> a consumer process that connects to the broker, creates N consumer threads,
> each of which working on a single stream (which could be composed of some
> number of partitions). In this scenario, you *really* want to explicitly
> control durability (e.g. commit()s) on a per-stream basis. You also have
> different lifecycle semantics and thread safety concerns at the stream
> level versus the global level. Is there a reason the API doesn't look more
> like:
> 
> // Thread safe, owns the multiplexed connection
> Consumer:
>  def subscribe(topic: String, streams: Int): Set[Stream]
>  def close() // Release everything
> 
> // Not at all thread safe, no synchronization.
> Stream:
>  def commit() // Really important this be here and not on Consumer.
>  def seek(...)
>  def poll(duration: Long, unit: TimeUnit): List[MessageThingie]
>  def close() // Release these partitions
>  ...
> 
> I think this also significantly reduces the complexity of the Consumer API
> and lets each thread in a consumer process handle stream lifecycle
> appropriately. Since the connection is multiplexed and things could get
> rebalanced, just toss an exception if the streams become invalid, forcing a
> resubscribe. That way we don't have crazy state logic.
> 
> I'm sure I'm missing something, but I wanted to toss this out there for
> folks to poke at.
> (p.s. I *really* want per-stream commit baked into the API.)
> -- 
> E. Sammer
> CTO - ScalingData

Re: New consumer APIs

Posted by Jun Rao <ju...@gmail.com>.
Eric,

Thanks for the feedback. It seems that due to Apache mail server issue,
your email 6 days ago just got delivered.

A quick answer to your question is that for simplicity, the proposed API
implicitly assumes there is only 1 stream returned in each Consumer and
therefore all seek/commit/poll are tied to that stream.  If a client wants
to have more parallelism, it could (1) create multiple Consumer instances,
or (2) feed the consumed messages into another threadpool (if ordering is
not important).

Jun


On Fri, May 9, 2014 at 1:19 PM, Eric Sammer <es...@scalingdata.com> wrote:

> All:
>
> I've been going over the new consumer APIs and it seems like we're
> squishing a lot of different concerns together into a single class. The
> scope of the new Consumer is kind of all over the place. Managing the
> lifecycle - and especially the thread safety - seems challenging.
> Specifically, Consumer seems to serve the following purposes:
> * Acts as a holder of subscription info (e.g. subscribe()).
> * Acts as a stream (e.g. poll(), seek()).
>
> I definitely think we want these to be separate. It's pretty common to have
> a consumer process that connects to the broker, creates N consumer threads,
> each of which working on a single stream (which could be composed of some
> number of partitions). In this scenario, you *really* want to explicitly
> control durability (e.g. commit()s) on a per-stream basis. You also have
> different lifecycle semantics and thread safety concerns at the stream
> level versus the global level. Is there a reason the API doesn't look more
> like:
>
> // Thread safe, owns the multiplexed connection
> Consumer:
>   def subscribe(topic: String, streams: Int): Set[Stream]
>   def close() // Release everything
>
> // Not at all thread safe, no synchronization.
> Stream:
>   def commit() // Really important this be here and not on Consumer.
>   def seek(...)
>   def poll(duration: Long, unit: TimeUnit): List[MessageThingie]
>   def close() // Release these partitions
>   ...
>
> I think this also significantly reduces the complexity of the Consumer API
> and lets each thread in a consumer process handle stream lifecycle
> appropriately. Since the connection is multiplexed and things could get
> rebalanced, just toss an exception if the streams become invalid, forcing a
> resubscribe. That way we don't have crazy state logic.
>
> I'm sure I'm missing something, but I wanted to toss this out there for
> folks to poke at.
> (p.s. I *really* want per-stream commit baked into the API.)
> --
> E. Sammer
> CTO - ScalingData
>