You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Christopher Piggott <cp...@gmail.com> on 2015/02/10 00:20:29 UTC

Need sample/example of updating offset in SImple Consumer

Hi,

Can somebody provide me with an example of how to formulate an
OffsetCommitRequest for a single stream/partition using SimpleConsumer from
java?

Both ends, really ... periodically issuing commits, but also how to get the
current offset when starting up.


I can show what I'm attempting ... but failing to connect the objects and
constructors:


TopicAndPartition key = new TopicAndPartition(topic, shardNum);
OffsetMetadataAndError value = new OffsetMetadataAndError(offset); /* ??? */

Map<TopicAndPartition, OffsetMetadataAndError> map =
Collections.singletonMap(key, value);

OffsetCommitRequest request = new OffsetCommitRequest(
        groupId,
        map,
        kafka.api.OffsetCommitRequest.CurrentVersion(),
        0, /* what do I do with this - correlation id? */
        clientName);