You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Jay Kreps <ja...@confluent.io> on 2015/01/12 04:57:09 UTC

New consumer client

I uploaded an updated version of the new consumer client (
https://issues.apache.org/jira/browse/KAFKA-1760). This is now almost
feature complete, and has pretty reasonable testing and metrics. I think it
is ready for review and could be checked in once 0.8.2 is out.

For those who haven't been following this is meant to be a new consumer
client, like the new producer is 0.8.2, and intended to replace the
existing "high level" and "simple" scala consumers.

This still needs the server-side implementation of the partition assignment
and group management to be fully functional. I have just stubbed this out
in the server to allow the implementation and testing of the server but
actual usage will require it. However the client that exists now is
actually a fully functional replacement for the "simple consumer" that is
vastly easier to use correctly as it internally does all the discovery and
failover.

It would be great if people could take a look at this code, and
particularly at the public apis which have several small changes from the
original proposal.

Summary

What's there:
1. Simple consumer functionality
2. Offset commit and fetch
3. Ability to change position with seek
4. Ability to commit all or just some offsets
5. Controller discovery, failure detection, heartbeat, and fail-over
6. Controller partition assignment
7. Logging
8. Metrics
9. Integration tests including tests that simulate random broker failures
10. Integration into the consumer performance test

Limitations:
1. There could be some lingering bugs in the group management support, it
is hard to fully test fully with just the stub support on the server, so
we'll need to get the server working to do better I think.
2. I haven't implemented wild-card subscriptions yet.
3. No integration with console consumer yet

Performance

I did some performance comparison with the old consumer over localhost on
my laptop. Usually localhost isn't good for testing but in this case it is
good because it has near infinite bandwidth so it does a good job at
catching inefficiencies that would be hidden with a slower network. These
numbers probably aren't representative of what you would get over a real
network, but help bring out the relative efficiencies.
Here are the results:
- Old high-level consumer: 213 MB/sec
- New consumer: 225 MB/sec
- Old simple consumer: 242 Mb/sec

It may be hard to get this client up to the same point as the simple
consumer as it is doing very little beyond allocating and wrapping byte
buffers that it reads off the network.

The big thing that shows up in profiling is the buffer allocation for
reading data. So one speed-up would be to pool these.

Some things to discuss

1. What should the behavior of consumer.position() and consumer.committed()
be immediately after initialization (prior to calling poll). Currently
these methods just fetch the current value from memory, but if the position
isn't in memory it will try to fetch it from the server, if no position is
found it will use the auto-offset reset policy to pick on. I think this is
the right thing to do because you can't guarantee how many calls to poll()
will be required before full initialization would be complete otherwise.
But it is kind of weird.
2. Overall code structure improvement. These NIO network clients tend to be
very imperative in nature. I'm not sure this is bad, but if anyone has any
idea on improving the code I'd love to hear it.

-Jay

Re: New consumer client

Posted by Jay Kreps <ja...@gmail.com>.
What this means is that the CRC that was stored with the data no longer
matches. There could be two reasons for this:
1. We have some rare bug in the CRC calculation or check logic or are
otherwise corrupting data in flight
2. You have disk or network corruption which changed the bits.

To diagnose:
1. Wait till this occurs in a consumer
2. Check that reading that message fails when it is done manually using
kafka-simple-consumer-shell.sh
3. Check that the message is in fact corrupt when we directly dump that
file using bin/kafka-run-class.sh kafka.tools.DumpLogSegments

If (2) and (3) fail then you have disk corruption. If not then it could be
a bug.

-Jay

On Tue, Feb 10, 2015 at 1:28 PM, Steven Wu <st...@gmail.com> wrote:

> Jay, we have observed CRC corruption too occasionally. I reported in an
> thread and asked how should we handle some error conditions from old
> high-level consumer.
>
> On Mon, Feb 9, 2015 at 11:36 PM, Bhavesh Mistry <
> mistry.p.bhavesh@gmail.com>
> wrote:
>
> > Hi Jay,
> >
> > 1) Sorry to get back to you so late.  It is CRC check error on any
> consumer
> > thread regardless of the server.   What happens is I have to catch this
> > exception is skip the message now.  There is no option to re-fetch this
> > message.   Is there any way to add behavior in Java consumer to re-fetch
> > this offset CRC failed offset.
> >
> >
> > 2)  Secondly,  can you please add default behavior to auto set
> > 'fetch.message.max.bytes' = broker's message.max.bytes.  This will ensure
> > smooth configuration for both simple and high level consumer. This will
> > take burden away from Kafka user to config this property.  We had lag
> issue
> > due to this mis configuration and drop messages on Camus side and (camus
> > has different setting for simple consumer).  It would be great to auto
> > config this if user did not supply this in configuration.
> >
> > Let me know if you agree with #2.
> >
> > Thanks,
> >
> > Bhavesh
> >
> > On Mon, Jan 12, 2015 at 9:25 AM, Jay Kreps <ja...@gmail.com> wrote:
> >
> > > Hey Bhavesh,
> > >
> > > This seems like a serious issue and not one anyone else has reported. I
> > > don't know what you mean by corrupt message, are you saying the CRC
> check
> > > fails? If so, that check is done both by the broker (prior to appending
> > to
> > > the log) and the consumer so that implies either a bug in the broker or
> > > else disk corruption on the server.
> > >
> > > I do have an option to disable the CRC check in the consumer, though
> > > depending on the nature of the corruption that can just lead to more
> > > serious errors (depending on what is corrupted).
> > >
> > > -jay
> > >
> > > On Sun, Jan 11, 2015 at 11:00 PM, Bhavesh Mistry <
> > > mistry.p.bhavesh@gmail.com
> > > > wrote:
> > >
> > > > Hi Jay,
> > > >
> > > > One of the pain point of existing consumer code is CORRUPT_MESSAGE
> > > > occasionally. Right now, it is hard to pin-point the problem of
> > > > CORRUPT_MESSAGE especially when this happen on Mirror Maker side. Is
> > > there
> > > > any proposal to auto skip corrupted message and have reporting
> > visibility
> > > > of CRC error(metics etc or traceability to find corruption).per topic
> > > etc ?
> > > > I am not sure if this is correct email thread to address this if not
> > > please
> > > > let me know.
> > > >
> > > > Will provide feedback about new consumer api and changes.
> > > > Thanks,
> > > >
> > > > Bhavesh
> > > >
> > > > On Sun, Jan 11, 2015 at 7:57 PM, Jay Kreps <ja...@confluent.io> wrote:
> > > >
> > > > > I uploaded an updated version of the new consumer client (
> > > > > https://issues.apache.org/jira/browse/KAFKA-1760). This is now
> > almost
> > > > > feature complete, and has pretty reasonable testing and metrics. I
> > > think
> > > > it
> > > > > is ready for review and could be checked in once 0.8.2 is out.
> > > > >
> > > > > For those who haven't been following this is meant to be a new
> > consumer
> > > > > client, like the new producer is 0.8.2, and intended to replace the
> > > > > existing "high level" and "simple" scala consumers.
> > > > >
> > > > > This still needs the server-side implementation of the partition
> > > > assignment
> > > > > and group management to be fully functional. I have just stubbed
> this
> > > out
> > > > > in the server to allow the implementation and testing of the server
> > but
> > > > > actual usage will require it. However the client that exists now is
> > > > > actually a fully functional replacement for the "simple consumer"
> > that
> > > is
> > > > > vastly easier to use correctly as it internally does all the
> > discovery
> > > > and
> > > > > failover.
> > > > >
> > > > > It would be great if people could take a look at this code, and
> > > > > particularly at the public apis which have several small changes
> from
> > > the
> > > > > original proposal.
> > > > >
> > > > > Summary
> > > > >
> > > > > What's there:
> > > > > 1. Simple consumer functionality
> > > > > 2. Offset commit and fetch
> > > > > 3. Ability to change position with seek
> > > > > 4. Ability to commit all or just some offsets
> > > > > 5. Controller discovery, failure detection, heartbeat, and
> fail-over
> > > > > 6. Controller partition assignment
> > > > > 7. Logging
> > > > > 8. Metrics
> > > > > 9. Integration tests including tests that simulate random broker
> > > failures
> > > > > 10. Integration into the consumer performance test
> > > > >
> > > > > Limitations:
> > > > > 1. There could be some lingering bugs in the group management
> > support,
> > > it
> > > > > is hard to fully test fully with just the stub support on the
> server,
> > > so
> > > > > we'll need to get the server working to do better I think.
> > > > > 2. I haven't implemented wild-card subscriptions yet.
> > > > > 3. No integration with console consumer yet
> > > > >
> > > > > Performance
> > > > >
> > > > > I did some performance comparison with the old consumer over
> > localhost
> > > on
> > > > > my laptop. Usually localhost isn't good for testing but in this
> case
> > it
> > > > is
> > > > > good because it has near infinite bandwidth so it does a good job
> at
> > > > > catching inefficiencies that would be hidden with a slower network.
> > > These
> > > > > numbers probably aren't representative of what you would get over a
> > > real
> > > > > network, but help bring out the relative efficiencies.
> > > > > Here are the results:
> > > > > - Old high-level consumer: 213 MB/sec
> > > > > - New consumer: 225 MB/sec
> > > > > - Old simple consumer: 242 Mb/sec
> > > > >
> > > > > It may be hard to get this client up to the same point as the
> simple
> > > > > consumer as it is doing very little beyond allocating and wrapping
> > byte
> > > > > buffers that it reads off the network.
> > > > >
> > > > > The big thing that shows up in profiling is the buffer allocation
> for
> > > > > reading data. So one speed-up would be to pool these.
> > > > >
> > > > > Some things to discuss
> > > > >
> > > > > 1. What should the behavior of consumer.position() and
> > > > consumer.committed()
> > > > > be immediately after initialization (prior to calling poll).
> > Currently
> > > > > these methods just fetch the current value from memory, but if the
> > > > position
> > > > > isn't in memory it will try to fetch it from the server, if no
> > position
> > > > is
> > > > > found it will use the auto-offset reset policy to pick on. I think
> > this
> > > > is
> > > > > the right thing to do because you can't guarantee how many calls to
> > > > poll()
> > > > > will be required before full initialization would be complete
> > > otherwise.
> > > > > But it is kind of weird.
> > > > > 2. Overall code structure improvement. These NIO network clients
> tend
> > > to
> > > > be
> > > > > very imperative in nature. I'm not sure this is bad, but if anyone
> > has
> > > > any
> > > > > idea on improving the code I'd love to hear it.
> > > > >
> > > > > -Jay
> > > > >
> > > >
> > >
> >
>

Re: New consumer client

Posted by Steven Wu <st...@gmail.com>.
Jay, we have observed CRC corruption too occasionally. I reported in an
thread and asked how should we handle some error conditions from old
high-level consumer.

On Mon, Feb 9, 2015 at 11:36 PM, Bhavesh Mistry <mi...@gmail.com>
wrote:

> Hi Jay,
>
> 1) Sorry to get back to you so late.  It is CRC check error on any consumer
> thread regardless of the server.   What happens is I have to catch this
> exception is skip the message now.  There is no option to re-fetch this
> message.   Is there any way to add behavior in Java consumer to re-fetch
> this offset CRC failed offset.
>
>
> 2)  Secondly,  can you please add default behavior to auto set
> 'fetch.message.max.bytes' = broker's message.max.bytes.  This will ensure
> smooth configuration for both simple and high level consumer. This will
> take burden away from Kafka user to config this property.  We had lag issue
> due to this mis configuration and drop messages on Camus side and (camus
> has different setting for simple consumer).  It would be great to auto
> config this if user did not supply this in configuration.
>
> Let me know if you agree with #2.
>
> Thanks,
>
> Bhavesh
>
> On Mon, Jan 12, 2015 at 9:25 AM, Jay Kreps <ja...@gmail.com> wrote:
>
> > Hey Bhavesh,
> >
> > This seems like a serious issue and not one anyone else has reported. I
> > don't know what you mean by corrupt message, are you saying the CRC check
> > fails? If so, that check is done both by the broker (prior to appending
> to
> > the log) and the consumer so that implies either a bug in the broker or
> > else disk corruption on the server.
> >
> > I do have an option to disable the CRC check in the consumer, though
> > depending on the nature of the corruption that can just lead to more
> > serious errors (depending on what is corrupted).
> >
> > -jay
> >
> > On Sun, Jan 11, 2015 at 11:00 PM, Bhavesh Mistry <
> > mistry.p.bhavesh@gmail.com
> > > wrote:
> >
> > > Hi Jay,
> > >
> > > One of the pain point of existing consumer code is CORRUPT_MESSAGE
> > > occasionally. Right now, it is hard to pin-point the problem of
> > > CORRUPT_MESSAGE especially when this happen on Mirror Maker side. Is
> > there
> > > any proposal to auto skip corrupted message and have reporting
> visibility
> > > of CRC error(metics etc or traceability to find corruption).per topic
> > etc ?
> > > I am not sure if this is correct email thread to address this if not
> > please
> > > let me know.
> > >
> > > Will provide feedback about new consumer api and changes.
> > > Thanks,
> > >
> > > Bhavesh
> > >
> > > On Sun, Jan 11, 2015 at 7:57 PM, Jay Kreps <ja...@confluent.io> wrote:
> > >
> > > > I uploaded an updated version of the new consumer client (
> > > > https://issues.apache.org/jira/browse/KAFKA-1760). This is now
> almost
> > > > feature complete, and has pretty reasonable testing and metrics. I
> > think
> > > it
> > > > is ready for review and could be checked in once 0.8.2 is out.
> > > >
> > > > For those who haven't been following this is meant to be a new
> consumer
> > > > client, like the new producer is 0.8.2, and intended to replace the
> > > > existing "high level" and "simple" scala consumers.
> > > >
> > > > This still needs the server-side implementation of the partition
> > > assignment
> > > > and group management to be fully functional. I have just stubbed this
> > out
> > > > in the server to allow the implementation and testing of the server
> but
> > > > actual usage will require it. However the client that exists now is
> > > > actually a fully functional replacement for the "simple consumer"
> that
> > is
> > > > vastly easier to use correctly as it internally does all the
> discovery
> > > and
> > > > failover.
> > > >
> > > > It would be great if people could take a look at this code, and
> > > > particularly at the public apis which have several small changes from
> > the
> > > > original proposal.
> > > >
> > > > Summary
> > > >
> > > > What's there:
> > > > 1. Simple consumer functionality
> > > > 2. Offset commit and fetch
> > > > 3. Ability to change position with seek
> > > > 4. Ability to commit all or just some offsets
> > > > 5. Controller discovery, failure detection, heartbeat, and fail-over
> > > > 6. Controller partition assignment
> > > > 7. Logging
> > > > 8. Metrics
> > > > 9. Integration tests including tests that simulate random broker
> > failures
> > > > 10. Integration into the consumer performance test
> > > >
> > > > Limitations:
> > > > 1. There could be some lingering bugs in the group management
> support,
> > it
> > > > is hard to fully test fully with just the stub support on the server,
> > so
> > > > we'll need to get the server working to do better I think.
> > > > 2. I haven't implemented wild-card subscriptions yet.
> > > > 3. No integration with console consumer yet
> > > >
> > > > Performance
> > > >
> > > > I did some performance comparison with the old consumer over
> localhost
> > on
> > > > my laptop. Usually localhost isn't good for testing but in this case
> it
> > > is
> > > > good because it has near infinite bandwidth so it does a good job at
> > > > catching inefficiencies that would be hidden with a slower network.
> > These
> > > > numbers probably aren't representative of what you would get over a
> > real
> > > > network, but help bring out the relative efficiencies.
> > > > Here are the results:
> > > > - Old high-level consumer: 213 MB/sec
> > > > - New consumer: 225 MB/sec
> > > > - Old simple consumer: 242 Mb/sec
> > > >
> > > > It may be hard to get this client up to the same point as the simple
> > > > consumer as it is doing very little beyond allocating and wrapping
> byte
> > > > buffers that it reads off the network.
> > > >
> > > > The big thing that shows up in profiling is the buffer allocation for
> > > > reading data. So one speed-up would be to pool these.
> > > >
> > > > Some things to discuss
> > > >
> > > > 1. What should the behavior of consumer.position() and
> > > consumer.committed()
> > > > be immediately after initialization (prior to calling poll).
> Currently
> > > > these methods just fetch the current value from memory, but if the
> > > position
> > > > isn't in memory it will try to fetch it from the server, if no
> position
> > > is
> > > > found it will use the auto-offset reset policy to pick on. I think
> this
> > > is
> > > > the right thing to do because you can't guarantee how many calls to
> > > poll()
> > > > will be required before full initialization would be complete
> > otherwise.
> > > > But it is kind of weird.
> > > > 2. Overall code structure improvement. These NIO network clients tend
> > to
> > > be
> > > > very imperative in nature. I'm not sure this is bad, but if anyone
> has
> > > any
> > > > idea on improving the code I'd love to hear it.
> > > >
> > > > -Jay
> > > >
> > >
> >
>

Re: New consumer client

Posted by Bhavesh Mistry <mi...@gmail.com>.
Hi Jay,

1) Sorry to get back to you so late.  It is CRC check error on any consumer
thread regardless of the server.   What happens is I have to catch this
exception is skip the message now.  There is no option to re-fetch this
message.   Is there any way to add behavior in Java consumer to re-fetch
this offset CRC failed offset.


2)  Secondly,  can you please add default behavior to auto set
'fetch.message.max.bytes' = broker's message.max.bytes.  This will ensure
smooth configuration for both simple and high level consumer. This will
take burden away from Kafka user to config this property.  We had lag issue
due to this mis configuration and drop messages on Camus side and (camus
has different setting for simple consumer).  It would be great to auto
config this if user did not supply this in configuration.

Let me know if you agree with #2.

Thanks,

Bhavesh

On Mon, Jan 12, 2015 at 9:25 AM, Jay Kreps <ja...@gmail.com> wrote:

> Hey Bhavesh,
>
> This seems like a serious issue and not one anyone else has reported. I
> don't know what you mean by corrupt message, are you saying the CRC check
> fails? If so, that check is done both by the broker (prior to appending to
> the log) and the consumer so that implies either a bug in the broker or
> else disk corruption on the server.
>
> I do have an option to disable the CRC check in the consumer, though
> depending on the nature of the corruption that can just lead to more
> serious errors (depending on what is corrupted).
>
> -jay
>
> On Sun, Jan 11, 2015 at 11:00 PM, Bhavesh Mistry <
> mistry.p.bhavesh@gmail.com
> > wrote:
>
> > Hi Jay,
> >
> > One of the pain point of existing consumer code is CORRUPT_MESSAGE
> > occasionally. Right now, it is hard to pin-point the problem of
> > CORRUPT_MESSAGE especially when this happen on Mirror Maker side. Is
> there
> > any proposal to auto skip corrupted message and have reporting visibility
> > of CRC error(metics etc or traceability to find corruption).per topic
> etc ?
> > I am not sure if this is correct email thread to address this if not
> please
> > let me know.
> >
> > Will provide feedback about new consumer api and changes.
> > Thanks,
> >
> > Bhavesh
> >
> > On Sun, Jan 11, 2015 at 7:57 PM, Jay Kreps <ja...@confluent.io> wrote:
> >
> > > I uploaded an updated version of the new consumer client (
> > > https://issues.apache.org/jira/browse/KAFKA-1760). This is now almost
> > > feature complete, and has pretty reasonable testing and metrics. I
> think
> > it
> > > is ready for review and could be checked in once 0.8.2 is out.
> > >
> > > For those who haven't been following this is meant to be a new consumer
> > > client, like the new producer is 0.8.2, and intended to replace the
> > > existing "high level" and "simple" scala consumers.
> > >
> > > This still needs the server-side implementation of the partition
> > assignment
> > > and group management to be fully functional. I have just stubbed this
> out
> > > in the server to allow the implementation and testing of the server but
> > > actual usage will require it. However the client that exists now is
> > > actually a fully functional replacement for the "simple consumer" that
> is
> > > vastly easier to use correctly as it internally does all the discovery
> > and
> > > failover.
> > >
> > > It would be great if people could take a look at this code, and
> > > particularly at the public apis which have several small changes from
> the
> > > original proposal.
> > >
> > > Summary
> > >
> > > What's there:
> > > 1. Simple consumer functionality
> > > 2. Offset commit and fetch
> > > 3. Ability to change position with seek
> > > 4. Ability to commit all or just some offsets
> > > 5. Controller discovery, failure detection, heartbeat, and fail-over
> > > 6. Controller partition assignment
> > > 7. Logging
> > > 8. Metrics
> > > 9. Integration tests including tests that simulate random broker
> failures
> > > 10. Integration into the consumer performance test
> > >
> > > Limitations:
> > > 1. There could be some lingering bugs in the group management support,
> it
> > > is hard to fully test fully with just the stub support on the server,
> so
> > > we'll need to get the server working to do better I think.
> > > 2. I haven't implemented wild-card subscriptions yet.
> > > 3. No integration with console consumer yet
> > >
> > > Performance
> > >
> > > I did some performance comparison with the old consumer over localhost
> on
> > > my laptop. Usually localhost isn't good for testing but in this case it
> > is
> > > good because it has near infinite bandwidth so it does a good job at
> > > catching inefficiencies that would be hidden with a slower network.
> These
> > > numbers probably aren't representative of what you would get over a
> real
> > > network, but help bring out the relative efficiencies.
> > > Here are the results:
> > > - Old high-level consumer: 213 MB/sec
> > > - New consumer: 225 MB/sec
> > > - Old simple consumer: 242 Mb/sec
> > >
> > > It may be hard to get this client up to the same point as the simple
> > > consumer as it is doing very little beyond allocating and wrapping byte
> > > buffers that it reads off the network.
> > >
> > > The big thing that shows up in profiling is the buffer allocation for
> > > reading data. So one speed-up would be to pool these.
> > >
> > > Some things to discuss
> > >
> > > 1. What should the behavior of consumer.position() and
> > consumer.committed()
> > > be immediately after initialization (prior to calling poll). Currently
> > > these methods just fetch the current value from memory, but if the
> > position
> > > isn't in memory it will try to fetch it from the server, if no position
> > is
> > > found it will use the auto-offset reset policy to pick on. I think this
> > is
> > > the right thing to do because you can't guarantee how many calls to
> > poll()
> > > will be required before full initialization would be complete
> otherwise.
> > > But it is kind of weird.
> > > 2. Overall code structure improvement. These NIO network clients tend
> to
> > be
> > > very imperative in nature. I'm not sure this is bad, but if anyone has
> > any
> > > idea on improving the code I'd love to hear it.
> > >
> > > -Jay
> > >
> >
>

Re: New consumer client

Posted by Jay Kreps <ja...@gmail.com>.
Hey Bhavesh,

This seems like a serious issue and not one anyone else has reported. I
don't know what you mean by corrupt message, are you saying the CRC check
fails? If so, that check is done both by the broker (prior to appending to
the log) and the consumer so that implies either a bug in the broker or
else disk corruption on the server.

I do have an option to disable the CRC check in the consumer, though
depending on the nature of the corruption that can just lead to more
serious errors (depending on what is corrupted).

-jay

On Sun, Jan 11, 2015 at 11:00 PM, Bhavesh Mistry <mistry.p.bhavesh@gmail.com
> wrote:

> Hi Jay,
>
> One of the pain point of existing consumer code is CORRUPT_MESSAGE
> occasionally. Right now, it is hard to pin-point the problem of
> CORRUPT_MESSAGE especially when this happen on Mirror Maker side. Is there
> any proposal to auto skip corrupted message and have reporting visibility
> of CRC error(metics etc or traceability to find corruption).per topic etc ?
> I am not sure if this is correct email thread to address this if not please
> let me know.
>
> Will provide feedback about new consumer api and changes.
> Thanks,
>
> Bhavesh
>
> On Sun, Jan 11, 2015 at 7:57 PM, Jay Kreps <ja...@confluent.io> wrote:
>
> > I uploaded an updated version of the new consumer client (
> > https://issues.apache.org/jira/browse/KAFKA-1760). This is now almost
> > feature complete, and has pretty reasonable testing and metrics. I think
> it
> > is ready for review and could be checked in once 0.8.2 is out.
> >
> > For those who haven't been following this is meant to be a new consumer
> > client, like the new producer is 0.8.2, and intended to replace the
> > existing "high level" and "simple" scala consumers.
> >
> > This still needs the server-side implementation of the partition
> assignment
> > and group management to be fully functional. I have just stubbed this out
> > in the server to allow the implementation and testing of the server but
> > actual usage will require it. However the client that exists now is
> > actually a fully functional replacement for the "simple consumer" that is
> > vastly easier to use correctly as it internally does all the discovery
> and
> > failover.
> >
> > It would be great if people could take a look at this code, and
> > particularly at the public apis which have several small changes from the
> > original proposal.
> >
> > Summary
> >
> > What's there:
> > 1. Simple consumer functionality
> > 2. Offset commit and fetch
> > 3. Ability to change position with seek
> > 4. Ability to commit all or just some offsets
> > 5. Controller discovery, failure detection, heartbeat, and fail-over
> > 6. Controller partition assignment
> > 7. Logging
> > 8. Metrics
> > 9. Integration tests including tests that simulate random broker failures
> > 10. Integration into the consumer performance test
> >
> > Limitations:
> > 1. There could be some lingering bugs in the group management support, it
> > is hard to fully test fully with just the stub support on the server, so
> > we'll need to get the server working to do better I think.
> > 2. I haven't implemented wild-card subscriptions yet.
> > 3. No integration with console consumer yet
> >
> > Performance
> >
> > I did some performance comparison with the old consumer over localhost on
> > my laptop. Usually localhost isn't good for testing but in this case it
> is
> > good because it has near infinite bandwidth so it does a good job at
> > catching inefficiencies that would be hidden with a slower network. These
> > numbers probably aren't representative of what you would get over a real
> > network, but help bring out the relative efficiencies.
> > Here are the results:
> > - Old high-level consumer: 213 MB/sec
> > - New consumer: 225 MB/sec
> > - Old simple consumer: 242 Mb/sec
> >
> > It may be hard to get this client up to the same point as the simple
> > consumer as it is doing very little beyond allocating and wrapping byte
> > buffers that it reads off the network.
> >
> > The big thing that shows up in profiling is the buffer allocation for
> > reading data. So one speed-up would be to pool these.
> >
> > Some things to discuss
> >
> > 1. What should the behavior of consumer.position() and
> consumer.committed()
> > be immediately after initialization (prior to calling poll). Currently
> > these methods just fetch the current value from memory, but if the
> position
> > isn't in memory it will try to fetch it from the server, if no position
> is
> > found it will use the auto-offset reset policy to pick on. I think this
> is
> > the right thing to do because you can't guarantee how many calls to
> poll()
> > will be required before full initialization would be complete otherwise.
> > But it is kind of weird.
> > 2. Overall code structure improvement. These NIO network clients tend to
> be
> > very imperative in nature. I'm not sure this is bad, but if anyone has
> any
> > idea on improving the code I'd love to hear it.
> >
> > -Jay
> >
>

Re: New consumer client

Posted by Bhavesh Mistry <mi...@gmail.com>.
Hi Jay,

One of the pain point of existing consumer code is CORRUPT_MESSAGE
occasionally. Right now, it is hard to pin-point the problem of
CORRUPT_MESSAGE especially when this happen on Mirror Maker side. Is there
any proposal to auto skip corrupted message and have reporting visibility
of CRC error(metics etc or traceability to find corruption).per topic etc ?
I am not sure if this is correct email thread to address this if not please
let me know.

Will provide feedback about new consumer api and changes.
Thanks,

Bhavesh

On Sun, Jan 11, 2015 at 7:57 PM, Jay Kreps <ja...@confluent.io> wrote:

> I uploaded an updated version of the new consumer client (
> https://issues.apache.org/jira/browse/KAFKA-1760). This is now almost
> feature complete, and has pretty reasonable testing and metrics. I think it
> is ready for review and could be checked in once 0.8.2 is out.
>
> For those who haven't been following this is meant to be a new consumer
> client, like the new producer is 0.8.2, and intended to replace the
> existing "high level" and "simple" scala consumers.
>
> This still needs the server-side implementation of the partition assignment
> and group management to be fully functional. I have just stubbed this out
> in the server to allow the implementation and testing of the server but
> actual usage will require it. However the client that exists now is
> actually a fully functional replacement for the "simple consumer" that is
> vastly easier to use correctly as it internally does all the discovery and
> failover.
>
> It would be great if people could take a look at this code, and
> particularly at the public apis which have several small changes from the
> original proposal.
>
> Summary
>
> What's there:
> 1. Simple consumer functionality
> 2. Offset commit and fetch
> 3. Ability to change position with seek
> 4. Ability to commit all or just some offsets
> 5. Controller discovery, failure detection, heartbeat, and fail-over
> 6. Controller partition assignment
> 7. Logging
> 8. Metrics
> 9. Integration tests including tests that simulate random broker failures
> 10. Integration into the consumer performance test
>
> Limitations:
> 1. There could be some lingering bugs in the group management support, it
> is hard to fully test fully with just the stub support on the server, so
> we'll need to get the server working to do better I think.
> 2. I haven't implemented wild-card subscriptions yet.
> 3. No integration with console consumer yet
>
> Performance
>
> I did some performance comparison with the old consumer over localhost on
> my laptop. Usually localhost isn't good for testing but in this case it is
> good because it has near infinite bandwidth so it does a good job at
> catching inefficiencies that would be hidden with a slower network. These
> numbers probably aren't representative of what you would get over a real
> network, but help bring out the relative efficiencies.
> Here are the results:
> - Old high-level consumer: 213 MB/sec
> - New consumer: 225 MB/sec
> - Old simple consumer: 242 Mb/sec
>
> It may be hard to get this client up to the same point as the simple
> consumer as it is doing very little beyond allocating and wrapping byte
> buffers that it reads off the network.
>
> The big thing that shows up in profiling is the buffer allocation for
> reading data. So one speed-up would be to pool these.
>
> Some things to discuss
>
> 1. What should the behavior of consumer.position() and consumer.committed()
> be immediately after initialization (prior to calling poll). Currently
> these methods just fetch the current value from memory, but if the position
> isn't in memory it will try to fetch it from the server, if no position is
> found it will use the auto-offset reset policy to pick on. I think this is
> the right thing to do because you can't guarantee how many calls to poll()
> will be required before full initialization would be complete otherwise.
> But it is kind of weird.
> 2. Overall code structure improvement. These NIO network clients tend to be
> very imperative in nature. I'm not sure this is bad, but if anyone has any
> idea on improving the code I'd love to hear it.
>
> -Jay
>