You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by shangan chen <ch...@gmail.com> on 2013/05/31 08:31:39 UTC

question about usage of SimpleConsumer

In Kafka, the consumers are responsible for maintaining state information
(offset) on what has been consumed (refer from kafka design
page).high-level consumer api will store its consumption state in
zookeeper, while simple consumer shoud deal with these things itself.
My doubt is  what happened when I call getOffsetsBefore(topic,
partition,OffsetRequest.LatestTime(), maxNumOffsets) ? Where did it fetch
offset as I didn't store the offset, it seems that kafka maintain the
offset, anybody can give some explanation.


public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector) {
_collector = collector;
_consumer = new SimpleConsumer(host, port, soTimeout, buffersize);
long[] offsets = _consumer.getOffsetsBefore(topic, partition,
OffsetRequest.LatestTime(), maxNumOffsets);
offset = offsets[0];
new StringScheme();
}

@Override
public void nextTuple() {
FetchRequest fetch = new FetchRequest(topic, partition, offset, maxSize);
ByteBufferMessageSet msgSet = _consumer.fetch(fetch);
for (MessageAndOffset msgAndOffset : msgSet) {
String msg = getMessage(msgAndOffset.message());
// log spout process time
Debug.log(this.getClass().getSimpleName(), msg);
Debug.incr(topic + "_" + this.getClass().getSimpleName(), 1);
_collector
.emit(new Values(msg), new KafkaMessageId(msg, offset, 1));
offset = msgAndOffset.offset();
}
}


-- 
have a good day!
chenshang'an

Re: question about usage of SimpleConsumer

Posted by Neha Narkhede <ne...@gmail.com>.
I mean, simple consumer
won't get the offset where it stopped, but the offset of brokers latest
offset.

Correct. Also each consumer in Kafka reads data independently from the
broker.

Thanks,
Neha
On May 31, 2013 7:44 PM, "shangan chen" <ch...@gmail.com> wrote:

> You mean kafka broker will maintain earliest/latest offset for each
> partition of each topic ? Do these offsets information have anything to do
> with consumers or just broker's own information? I mean, simple consumer
> won't get the offset where it stopped, but the offset of brokers latest
> offset.
>
> Another thing, If I have multiple simple consumers (in storm there are
> multiple spouts)consume messages from specific partition of specific topic
> of a single broker, will they share the same stream or consume the messages
> independently. In testing like last example, I found each consumer
> established a stream  other than all consumers share the same stream. It
> seems to verify that latest offset is brokers's queue info other than
> consumer's state.
>
>
>
>
> On Fri, May 31, 2013 at 11:49 PM, Neha Narkhede <neha.narkhede@gmail.com
> >wrote:
>
> > getOffsetsBefore sends an RPC call to the Kafka brokers to find out the
> > earliest/latest offset for that topic, partition. In your example, it
> will
> > get you the latest offset at the time of the request.
> >
> > Thanks,
> > Neha
> >
> >
> > On Thu, May 30, 2013 at 11:31 PM, shangan chen <chenshangan521@gmail.com
> > >wrote:
> >
> > > In Kafka, the consumers are responsible for maintaining state
> information
> > > (offset) on what has been consumed (refer from kafka design
> > > page).high-level consumer api will store its consumption state in
> > > zookeeper, while simple consumer shoud deal with these things itself.
> > > My doubt is  what happened when I call getOffsetsBefore(topic,
> > > partition,OffsetRequest.LatestTime(), maxNumOffsets) ? Where did it
> fetch
> > > offset as I didn't store the offset, it seems that kafka maintain the
> > > offset, anybody can give some explanation.
> > >
> > >
> > > public void open(Map conf, TopologyContext context,
> > > SpoutOutputCollector collector) {
> > > _collector = collector;
> > > _consumer = new SimpleConsumer(host, port, soTimeout, buffersize);
> > > long[] offsets = _consumer.getOffsetsBefore(topic, partition,
> > > OffsetRequest.LatestTime(), maxNumOffsets);
> > > offset = offsets[0];
> > > new StringScheme();
> > > }
> > >
> > > @Override
> > > public void nextTuple() {
> > > FetchRequest fetch = new FetchRequest(topic, partition, offset,
> maxSize);
> > > ByteBufferMessageSet msgSet = _consumer.fetch(fetch);
> > > for (MessageAndOffset msgAndOffset : msgSet) {
> > > String msg = getMessage(msgAndOffset.message());
> > > // log spout process time
> > > Debug.log(this.getClass().getSimpleName(), msg);
> > > Debug.incr(topic + "_" + this.getClass().getSimpleName(), 1);
> > > _collector
> > > .emit(new Values(msg), new KafkaMessageId(msg, offset, 1));
> > > offset = msgAndOffset.offset();
> > > }
> > > }
> > >
> > >
> > > --
> > > have a good day!
> > > chenshang'an
> > >
> >
>
>
>
> --
> have a good day!
> chenshang'an
>

Re: question about usage of SimpleConsumer

Posted by shangan chen <ch...@gmail.com>.
You mean kafka broker will maintain earliest/latest offset for each
partition of each topic ? Do these offsets information have anything to do
with consumers or just broker's own information? I mean, simple consumer
won't get the offset where it stopped, but the offset of brokers latest
offset.

Another thing, If I have multiple simple consumers (in storm there are
multiple spouts)consume messages from specific partition of specific topic
of a single broker, will they share the same stream or consume the messages
independently. In testing like last example, I found each consumer
established a stream  other than all consumers share the same stream. It
seems to verify that latest offset is brokers's queue info other than
consumer's state.




On Fri, May 31, 2013 at 11:49 PM, Neha Narkhede <ne...@gmail.com>wrote:

> getOffsetsBefore sends an RPC call to the Kafka brokers to find out the
> earliest/latest offset for that topic, partition. In your example, it will
> get you the latest offset at the time of the request.
>
> Thanks,
> Neha
>
>
> On Thu, May 30, 2013 at 11:31 PM, shangan chen <chenshangan521@gmail.com
> >wrote:
>
> > In Kafka, the consumers are responsible for maintaining state information
> > (offset) on what has been consumed (refer from kafka design
> > page).high-level consumer api will store its consumption state in
> > zookeeper, while simple consumer shoud deal with these things itself.
> > My doubt is  what happened when I call getOffsetsBefore(topic,
> > partition,OffsetRequest.LatestTime(), maxNumOffsets) ? Where did it fetch
> > offset as I didn't store the offset, it seems that kafka maintain the
> > offset, anybody can give some explanation.
> >
> >
> > public void open(Map conf, TopologyContext context,
> > SpoutOutputCollector collector) {
> > _collector = collector;
> > _consumer = new SimpleConsumer(host, port, soTimeout, buffersize);
> > long[] offsets = _consumer.getOffsetsBefore(topic, partition,
> > OffsetRequest.LatestTime(), maxNumOffsets);
> > offset = offsets[0];
> > new StringScheme();
> > }
> >
> > @Override
> > public void nextTuple() {
> > FetchRequest fetch = new FetchRequest(topic, partition, offset, maxSize);
> > ByteBufferMessageSet msgSet = _consumer.fetch(fetch);
> > for (MessageAndOffset msgAndOffset : msgSet) {
> > String msg = getMessage(msgAndOffset.message());
> > // log spout process time
> > Debug.log(this.getClass().getSimpleName(), msg);
> > Debug.incr(topic + "_" + this.getClass().getSimpleName(), 1);
> > _collector
> > .emit(new Values(msg), new KafkaMessageId(msg, offset, 1));
> > offset = msgAndOffset.offset();
> > }
> > }
> >
> >
> > --
> > have a good day!
> > chenshang'an
> >
>



-- 
have a good day!
chenshang'an

Re: question about usage of SimpleConsumer

Posted by Neha Narkhede <ne...@gmail.com>.
getOffsetsBefore sends an RPC call to the Kafka brokers to find out the
earliest/latest offset for that topic, partition. In your example, it will
get you the latest offset at the time of the request.

Thanks,
Neha


On Thu, May 30, 2013 at 11:31 PM, shangan chen <ch...@gmail.com>wrote:

> In Kafka, the consumers are responsible for maintaining state information
> (offset) on what has been consumed (refer from kafka design
> page).high-level consumer api will store its consumption state in
> zookeeper, while simple consumer shoud deal with these things itself.
> My doubt is  what happened when I call getOffsetsBefore(topic,
> partition,OffsetRequest.LatestTime(), maxNumOffsets) ? Where did it fetch
> offset as I didn't store the offset, it seems that kafka maintain the
> offset, anybody can give some explanation.
>
>
> public void open(Map conf, TopologyContext context,
> SpoutOutputCollector collector) {
> _collector = collector;
> _consumer = new SimpleConsumer(host, port, soTimeout, buffersize);
> long[] offsets = _consumer.getOffsetsBefore(topic, partition,
> OffsetRequest.LatestTime(), maxNumOffsets);
> offset = offsets[0];
> new StringScheme();
> }
>
> @Override
> public void nextTuple() {
> FetchRequest fetch = new FetchRequest(topic, partition, offset, maxSize);
> ByteBufferMessageSet msgSet = _consumer.fetch(fetch);
> for (MessageAndOffset msgAndOffset : msgSet) {
> String msg = getMessage(msgAndOffset.message());
> // log spout process time
> Debug.log(this.getClass().getSimpleName(), msg);
> Debug.incr(topic + "_" + this.getClass().getSimpleName(), 1);
> _collector
> .emit(new Values(msg), new KafkaMessageId(msg, offset, 1));
> offset = msgAndOffset.offset();
> }
> }
>
>
> --
> have a good day!
> chenshang'an
>