You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by senthil kumar <gu...@gmail.com> on 2017/09/28 18:47:42 UTC

KafkaSpout not consuming the first uncommitted offset data from kafka

Hi Kafka,

I have a trident topology in storm which consumes data from kafka. Now i am
seeing an issue in KafkaSpout. This is not consuming the very first tthe
first uncommitted offset data from kafka.

My storm version is 1.1.1 and kafka version is 0.11.0.0. I have a topic say
X and partition of the topic is 3.

I have following configuration to consume data using KafkaSpout


KafkaSpoutConfig<String, String> kafkaConfig =
KafkaSpoutConfig.builder(PropertyUtil.getStringValue(PropertyUtil.KAFKA_BROKERS),
PropertyUtil.getStringValue(PropertyUtil.TOPIC_NAME))
.setProp(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, "4194304")
.setProp(ConsumerConfig.GROUP_ID_CONFIG,PropertyUtil.getStringValue(PropertyUtil.CONSUMER_ID))
.setProp(ConsumerConfig.RECEIVE_BUFFER_CONFIG, "4194304")
.setProp(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
.setFirstPollOffsetStrategy(FirstPollOffsetStrategy.UNCOMMITTED_LATEST)
.build();

TopologyBuilder builder = new TopologyBuilder();

builder.setSpout("spout", new KafkaSpout<String,String>(kafkaConfig),3);

Following are my test cases

1. Processor started with new consumer id. The very first time it starts to
read the data from latest. Fine.
2. Sending some messages to kafka and i am seeing all the messages are
consumed by my trident topology.
3. Stopped my trident topology.
4. Sending some messages to kafka (partition_0). Say example
> msg_1
> msg_2
> msg_3
> msg_4
> msg_5

5. Started the topology. And kafkaspout consumes the data from msg_2. It is
not consuming the msg_1.
6. Stopped  the topology.
7. Sending some messages to kafka to all the partitions (_0, _1, _2). Say
example
Partition_0
> msg_6
> msg_7
> msg_8
Partition_1
> msg_9
> msg_10
> msg_11
Partition_2
> msg_12
> msg_13
> msg_14

8. Started the topology. And kafkaspout consumes following messages
> msg_7
> msg_8
> msg_10
> msg_11
> msg_13
> msg_14

It skipped the earliest uncommitted message in each partition.

Below is the definitions of UNCOMMITTED_LATEST in JavaDoc.

UNCOMMITTED_LATEST means that the kafka spout polls records from the last
committed offset, if any. If no offset has been committed, it behaves as
LATEST.

As per the definitions, it should read from last committed offset. But it
looks like it is reading from uncommitted earliest + 1. I meant the pointer
seems to be wrong.

Please have a look and let me know if anything wrong in my tests.

I am expecting a response from you, even it is not an issue.

Thanks,
Senthil

Re: KafkaSpout not consuming the first uncommitted offset data from kafka

Posted by Michael Noll <mi...@confluent.io>.
Hi Senthil,

you should ask this question in the Apache Storm mailing list.

At first sight this looks like a problem with Storm's KafkaSpout
implementation, not with Kafka.

Best wishes,
Michael




On Thu, Sep 28, 2017 at 8:47 PM, senthil kumar <gu...@gmail.com>
wrote:

> Hi Kafka,
>
> I have a trident topology in storm which consumes data from kafka. Now i am
> seeing an issue in KafkaSpout. This is not consuming the very first tthe
> first uncommitted offset data from kafka.
>
> My storm version is 1.1.1 and kafka version is 0.11.0.0. I have a topic say
> X and partition of the topic is 3.
>
> I have following configuration to consume data using KafkaSpout
>
>
> KafkaSpoutConfig<String, String> kafkaConfig =
> KafkaSpoutConfig.builder(PropertyUtil.getStringValue(
> PropertyUtil.KAFKA_BROKERS),
> PropertyUtil.getStringValue(PropertyUtil.TOPIC_NAME))
> .setProp(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, "4194304")
> .setProp(ConsumerConfig.GROUP_ID_CONFIG,PropertyUtil.
> getStringValue(PropertyUtil.CONSUMER_ID))
> .setProp(ConsumerConfig.RECEIVE_BUFFER_CONFIG, "4194304")
> .setProp(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
> .setFirstPollOffsetStrategy(FirstPollOffsetStrategy.UNCOMMITTED_LATEST)
> .build();
>
> TopologyBuilder builder = new TopologyBuilder();
>
> builder.setSpout("spout", new KafkaSpout<String,String>(kafkaConfig),3);
>
> Following are my test cases
>
> 1. Processor started with new consumer id. The very first time it starts to
> read the data from latest. Fine.
> 2. Sending some messages to kafka and i am seeing all the messages are
> consumed by my trident topology.
> 3. Stopped my trident topology.
> 4. Sending some messages to kafka (partition_0). Say example
> > msg_1
> > msg_2
> > msg_3
> > msg_4
> > msg_5
>
> 5. Started the topology. And kafkaspout consumes the data from msg_2. It is
> not consuming the msg_1.
> 6. Stopped  the topology.
> 7. Sending some messages to kafka to all the partitions (_0, _1, _2). Say
> example
> Partition_0
> > msg_6
> > msg_7
> > msg_8
> Partition_1
> > msg_9
> > msg_10
> > msg_11
> Partition_2
> > msg_12
> > msg_13
> > msg_14
>
> 8. Started the topology. And kafkaspout consumes following messages
> > msg_7
> > msg_8
> > msg_10
> > msg_11
> > msg_13
> > msg_14
>
> It skipped the earliest uncommitted message in each partition.
>
> Below is the definitions of UNCOMMITTED_LATEST in JavaDoc.
>
> UNCOMMITTED_LATEST means that the kafka spout polls records from the last
> committed offset, if any. If no offset has been committed, it behaves as
> LATEST.
>
> As per the definitions, it should read from last committed offset. But it
> looks like it is reading from uncommitted earliest + 1. I meant the pointer
> seems to be wrong.
>
> Please have a look and let me know if anything wrong in my tests.
>
> I am expecting a response from you, even it is not an issue.
>
> Thanks,
> Senthil
>