You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Achanta Vamsi Subhash <ac...@flipkart.com> on 2014/06/17 11:23:53 UTC
Offset of last un-consumed message
Hi,
I have a consumer group with multiple threads (high-level consumers) which
read from a topic.
I am also using a SimpleConsumer to read messages given a start offset. I
am getting the offset as the last produced message using the below code.
How to get the last un-consumed message?
public long getLastOffset(SimpleConsumer consumer, String topic, int
partition,
long whichTime, String clientName) {
TopicAndPartition topicAndPartition = new TopicAndPartition(topic,
partition);
Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo =
new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
requestInfo.put(topicAndPartition, new
PartitionOffsetRequestInfo(whichTime, maxReads));
kafka.javaapi.OffsetRequest request = new
kafka.javaapi.OffsetRequest(
requestInfo, kafka.api.OffsetRequest.CurrentVersion(),
clientName);
OffsetResponse response = consumer.getOffsetsBefore(request);
if (response.hasError()) {
LOGGER.error("Error fetching data Offset Data the Broker.
Reason: " + response.errorCode(topic, partition) );
return 0;
}
long[] offsets = response.offsets(topic, partition);
return offsets[0];
}
--
Regards
Vamsi Subhash
Re: Offset of last un-consumed message
Posted by Jun Rao <ju...@gmail.com>.
Since the consumer returns the offset of each message, you can manage the
offset commit yourself in the application.
Thanks,
Jun
On Tue, Jun 17, 2014 at 2:24 AM, Achanta Vamsi Subhash <
achanta.vamsi@flipkart.com> wrote:
> Sorry. I want the first un-consumed message offset.
>
>
> On Tue, Jun 17, 2014 at 2:53 PM, Achanta Vamsi Subhash <
> achanta.vamsi@flipkart.com> wrote:
>
> > Hi,
> >
> > I have a consumer group with multiple threads (high-level consumers)
> which
> > read from a topic.
> >
> > I am also using a SimpleConsumer to read messages given a start offset. I
> > am getting the offset as the last produced message using the below code.
> > How to get the last un-consumed message?
> >
> > public long getLastOffset(SimpleConsumer consumer, String topic, int
> > partition,
> > long whichTime, String clientName) {
> > TopicAndPartition topicAndPartition = new
> TopicAndPartition(topic,
> > partition);
> > Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo =
> > new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
> > requestInfo.put(topicAndPartition, new
> > PartitionOffsetRequestInfo(whichTime, maxReads));
> > kafka.javaapi.OffsetRequest request = new
> > kafka.javaapi.OffsetRequest(
> > requestInfo, kafka.api.OffsetRequest.CurrentVersion(),
> > clientName);
> > OffsetResponse response = consumer.getOffsetsBefore(request);
> >
> > if (response.hasError()) {
> > LOGGER.error("Error fetching data Offset Data the Broker.
> > Reason: " + response.errorCode(topic, partition) );
> > return 0;
> > }
> > long[] offsets = response.offsets(topic, partition);
> > return offsets[0];
> > }
> >
> >
> > --
> > Regards
> > Vamsi Subhash
> >
>
>
>
> --
> Regards
> Vamsi Subhash
>
Re: Offset of last un-consumed message
Posted by Achanta Vamsi Subhash <ac...@flipkart.com>.
Sorry. I want the first un-consumed message offset.
On Tue, Jun 17, 2014 at 2:53 PM, Achanta Vamsi Subhash <
achanta.vamsi@flipkart.com> wrote:
> Hi,
>
> I have a consumer group with multiple threads (high-level consumers) which
> read from a topic.
>
> I am also using a SimpleConsumer to read messages given a start offset. I
> am getting the offset as the last produced message using the below code.
> How to get the last un-consumed message?
>
> public long getLastOffset(SimpleConsumer consumer, String topic, int
> partition,
> long whichTime, String clientName) {
> TopicAndPartition topicAndPartition = new TopicAndPartition(topic,
> partition);
> Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo =
> new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
> requestInfo.put(topicAndPartition, new
> PartitionOffsetRequestInfo(whichTime, maxReads));
> kafka.javaapi.OffsetRequest request = new
> kafka.javaapi.OffsetRequest(
> requestInfo, kafka.api.OffsetRequest.CurrentVersion(),
> clientName);
> OffsetResponse response = consumer.getOffsetsBefore(request);
>
> if (response.hasError()) {
> LOGGER.error("Error fetching data Offset Data the Broker.
> Reason: " + response.errorCode(topic, partition) );
> return 0;
> }
> long[] offsets = response.offsets(topic, partition);
> return offsets[0];
> }
>
>
> --
> Regards
> Vamsi Subhash
>
--
Regards
Vamsi Subhash