You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Michael Noll <mi...@confluent.io> on 2017/10/18 08:23:00 UTC

Re: KafkaSpout not consuming the first uncommitted offset data from kafka

Hi Senthil,

you should ask this question in the Apache Storm mailing list.

At first sight this looks like a problem with Storm's KafkaSpout
implementation, not with Kafka.

Best wishes,
Michael




On Thu, Sep 28, 2017 at 8:47 PM, senthil kumar <gu...@gmail.com>
wrote:

> Hi Kafka,
>
> I have a trident topology in storm which consumes data from kafka. Now i am
> seeing an issue in KafkaSpout. This is not consuming the very first tthe
> first uncommitted offset data from kafka.
>
> My storm version is 1.1.1 and kafka version is 0.11.0.0. I have a topic say
> X and partition of the topic is 3.
>
> I have following configuration to consume data using KafkaSpout
>
>
> KafkaSpoutConfig<String, String> kafkaConfig =
> KafkaSpoutConfig.builder(PropertyUtil.getStringValue(
> PropertyUtil.KAFKA_BROKERS),
> PropertyUtil.getStringValue(PropertyUtil.TOPIC_NAME))
> .setProp(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, "4194304")
> .setProp(ConsumerConfig.GROUP_ID_CONFIG,PropertyUtil.
> getStringValue(PropertyUtil.CONSUMER_ID))
> .setProp(ConsumerConfig.RECEIVE_BUFFER_CONFIG, "4194304")
> .setProp(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
> .setFirstPollOffsetStrategy(FirstPollOffsetStrategy.UNCOMMITTED_LATEST)
> .build();
>
> TopologyBuilder builder = new TopologyBuilder();
>
> builder.setSpout("spout", new KafkaSpout<String,String>(kafkaConfig),3);
>
> Following are my test cases
>
> 1. Processor started with new consumer id. The very first time it starts to
> read the data from latest. Fine.
> 2. Sending some messages to kafka and i am seeing all the messages are
> consumed by my trident topology.
> 3. Stopped my trident topology.
> 4. Sending some messages to kafka (partition_0). Say example
> > msg_1
> > msg_2
> > msg_3
> > msg_4
> > msg_5
>
> 5. Started the topology. And kafkaspout consumes the data from msg_2. It is
> not consuming the msg_1.
> 6. Stopped  the topology.
> 7. Sending some messages to kafka to all the partitions (_0, _1, _2). Say
> example
> Partition_0
> > msg_6
> > msg_7
> > msg_8
> Partition_1
> > msg_9
> > msg_10
> > msg_11
> Partition_2
> > msg_12
> > msg_13
> > msg_14
>
> 8. Started the topology. And kafkaspout consumes following messages
> > msg_7
> > msg_8
> > msg_10
> > msg_11
> > msg_13
> > msg_14
>
> It skipped the earliest uncommitted message in each partition.
>
> Below is the definitions of UNCOMMITTED_LATEST in JavaDoc.
>
> UNCOMMITTED_LATEST means that the kafka spout polls records from the last
> committed offset, if any. If no offset has been committed, it behaves as
> LATEST.
>
> As per the definitions, it should read from last committed offset. But it
> looks like it is reading from uncommitted earliest + 1. I meant the pointer
> seems to be wrong.
>
> Please have a look and let me know if anything wrong in my tests.
>
> I am expecting a response from you, even it is not an issue.
>
> Thanks,
> Senthil
>