You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by Sachin Pasalkar <Sa...@symantec.com> on 2016/05/26 07:48:57 UTC

Storm's Kafka spout is not reading latest data even with new consumer group

Currently if you give the latest consumer group it starts reading data from earliest offset rather than latest

In KafkaConfig

public long startOffsetTime = OffsetRequest.EarliestTime();


In doEmitNewPartitionBatch API of storm.kafka.trident.TridentKafkaEmitter, if consumer group is null calls goes to 109 line

  if (lastMeta != null) {

/*  98 */       String lastInstanceId = null;

/*  99 */       Map lastTopoMeta = (Map)lastMeta.get("topology");

/* 100 */       if (lastTopoMeta != null)

/* 101 */         lastInstanceId = (String)lastTopoMeta.get("id");

/*     */       long offset;

/* 103 */       if ((_config.ignoreZkOffsets) && (!_topologyInstanceId.equals(lastInstanceId))) {

/* 104 */         offset = KafkaUtils.getOffset(consumer, _config.topic, partition, _config.startOffsetTime);

/*     */       } else {

/* 106 */         offset = ((Long)lastMeta.get("nextOffset")).longValue();

/*     */       }

/*     */     } else {

/* 109 */       offset = KafkaUtils.getOffset(consumer, _config.topic, partition, _config);

/*     */     }

Which calls below API. As you can see this call will fetch earliest data rather than fetching latest

public static long getOffset(SimpleConsumer consumer, String topic, int partition, KafkaConfig config)


 {


    long startOffsetTime = config.startOffsetTime;


    return getOffset(consumer, topic, partition, startOffsetTime);








 }



How it should be (It was there in previous release 0.9.x)

 public static long getOffset(SimpleConsumer consumer, String topic, int partition, KafkaConfig config) {


        long startOffsetTime = kafka.api.OffsetRequest.LatestTime();


        if ( config.ignoreZkOffsets) {


            startOffsetTime = config.startOffsetTime;


        }


        return getOffset(consumer, topic, partition, startOffsetTime);


    }



This code was earlier present but somehow it got removed. I tried to search on github but didn't found history of change.

Thanks,

Sachin

Re: Storm's Kafka spout is not reading latest data even with new consumer group

Posted by Abhishek Agarwal <ab...@gmail.com>.
" IgnorezkOffset is true then should read from start of segment, if
consumer is new it should read from latest."
It varies for use case to use case. Probably the reason why it has been
made configurable. Personally, I find current Kafka spout configuration
settings more simple and flexible.

On Mon, May 30, 2016 at 3:07 PM, Sachin Pasalkar <
Sachin_Pasalkar@symantec.com> wrote:

> Does this make sense? I believe we shouldn’t  ask dev/devops to
> add/remember configuration. It should work by default by what it means e.g.
> IgnorezkOffset is true then should read from start of segment, if consumer
> is new it should read from latest. User shouldn’t has to mention any config
> to get it work.
>
> From: Sachin Pasalkar <sa...@symantec.com>
> Reply-To: "dev@storm.apache.org" <de...@storm.apache.org>
> Date: Monday, 30 May 2016 12:00 pm
> To: "dev@storm.apache.org" <de...@storm.apache.org>, Abhishek Agarwal <
> abhishcool@gmail.com>
> Cc: Bobby Evans <ev...@yahoo-inc.com>, Narendra Bidari <
> Narendra_Bidari@symantec.com>
> Subject: Re: Storm's Kafka spout is not reading latest data even with new
> consumer group
>
> What I am suggesting was there in 0.9.x version which got removed with
> changing variable name from forceFromStart to ignoreZkOffsets
>
> From: Sachin Pasalkar <sachin_pasalkar@symantec.com<
> mailto:sachin_pasalkar@symantec.com> <sachin_pasalkar@symantec.com%3E>>
> Reply-To: "dev@storm.apache.org<ma...@storm.apache.org>
> <dev@storm.apache.org%3E>" <dev@storm.apache.org<
> mailto:dev@storm.apache.org> <dev@storm.apache.org%3E>>
> Date: Monday, 30 May 2016 11:44 am
> To: Abhishek Agarwal <ab...@gmail.com>
> <abhishcool@gmail.com%3E>>
> Cc: "dev@storm.apache.org<ma...@storm.apache.org>
> <dev@storm.apache.org%3E>" <dev@storm.apache.org<
> mailto:dev@storm.apache.org> <dev@storm.apache.org%3E>>, Bobby Evans <
> evans@yahoo-inc.com<ma...@yahoo-inc.com> <evans@yahoo-inc.com%3E>>,
> Narendra Bidari <Narendra_Bidari@symantec.com<
> mailto:Narendra_Bidari@symantec.com> <Narendra_Bidari@symantec.com%3E>>
> Subject: Re: Storm's Kafka spout is not reading latest data even with new
> consumer group
>
> Mostly in PROD ignoreZkOffsets is false. However in testing user may want
> to read data from start of segment (May be to debug OR any other purpose).
> What I am talking about is less config management perspective. If user sets
> ignoreZkOffsets to true by default it will read from start of segment
> otherwise user can have specified config to read from
> config.startOffsetTime.
>
> From: Abhishek Agarwal <abhishcool@gmail.com<
> mailto:abhishcool@gmail.com><ma...@gmail.com>
> <abhishcool@gmail.com%3E%3Cmailto:abhishcool@gmail.com%3E>>
> Date: Monday, 30 May 2016 10:53 am
> To: Sachin Pasalkar <sachin_pasalkar@symantec.com<
> mailto:sachin_pasalkar@symantec.com><ma...@symantec.com>
> <sachin_pasalkar@symantec.com%3E%3Cmailto:sachin_pasalkar@symantec.com%3E>
> >
> Cc: "dev@storm.apache.org<
> mailto:dev@storm.apache.org><ma...@storm.apache.org>
> <dev@storm.apache.org%3E%3Cmailto:dev@storm.apache.org%3E>" <
> dev@storm.apache.org<
> mailto:dev@storm.apache.org><ma...@storm.apache.org>
> <dev@storm.apache.org%3E%3Cmailto:dev@storm.apache.org%3E>>, Bobby Evans <
> evans@yahoo-inc.com<
> mailto:evans@yahoo-inc.com><ma...@yahoo-inc.com>
> <evans@yahoo-inc.com%3E%3Cmailto:evans@yahoo-inc.com%3E>>, Narendra
> Bidari <Narendra_Bidari@symantec.com<
> mailto:Narendra_Bidari@symantec.com><ma...@symantec.com>
> <Narendra_Bidari@symantec.com%3E%3Cmailto:Narendra_Bidari@symantec.com%3E>
> >
> Subject: Re: Storm's Kafka spout is not reading latest data even with new
> consumer group
>
> how about setting ignoreZkOffsets to false? config.startOffsetTime
> indicates which offset to start from if there is no prior information about
> the offsets. if ignoreZkOffsets is true, it is intended to read from what
> is specified in configuration.
>
> On Mon, May 30, 2016 at 9:35 AM, Sachin Pasalkar <
> Sachin_Pasalkar@symantec.com<
> mailto:Sachin_Pasalkar@symantec.com><ma...@symantec.com>
> <Sachin_Pasalkar@symantec.com%3E%3Cmailto:Sachin_Pasalkar@symantec.com%3E>>
> wrote:
> Not really. If you look at storm.kafka.trident.TridentKafkaEmitter code
> (doEmitNewPartitionBatch API). Setting time to latest will loose impact of
> ignoreZkOffsets. If ignoreZkOffsets is set to true it will still read from
> latest which user will not expect to happen.
>
>
> /* 103 */       if ((_config.ignoreZkOffsets) &&
> (!_topologyInstanceId.equals(lastInstanceId))) {
>
> /* 104 */         offset = KafkaUtils.getOffset(consumer, _config.topic,
> partition, _config.startOffsetTime);
>
> /*     */       } else {
>
> /* 106 */         offset = ((Long)lastMeta.get("nextOffset")).longValue();
>
> /*     */       }
>
> /*     */     } else {
>
> /* 109 */       offset = KafkaUtils.getOffset(consumer, _config.topic,
> partition, _config);
>
> /*     */     }
>
> From: Abhishek Agarwal <abhishcool@gmail.com<
> mailto:abhishcool@gmail.com><ma...@gmail.com>
> <abhishcool@gmail.com%3E%3Cmailto:abhishcool@gmail.com%3E>>
> Reply-To: "dev@storm.apache.org<
> mailto:dev@storm.apache.org><ma...@storm.apache.org>
> <dev@storm.apache.org%3E%3Cmailto:dev@storm.apache.org%3E>" <
> dev@storm.apache.org<
> mailto:dev@storm.apache.org><ma...@storm.apache.org>
> <dev@storm.apache.org%3E%3Cmailto:dev@storm.apache.org%3E>>
> Date: Sunday, 29 May 2016 8:32 pm
> To: "dev@storm.apache.org<
> mailto:dev@storm.apache.org><ma...@storm.apache.org>
> <dev@storm.apache.org%3E%3Cmailto:dev@storm.apache.org%3E>" <
> dev@storm.apache.org<
> mailto:dev@storm.apache.org><ma...@storm.apache.org>
> <dev@storm.apache.org%3E%3Cmailto:dev@storm.apache.org%3E>>
> Cc: Bobby Evans <evans@yahoo-inc.com<
> mailto:evans@yahoo-inc.com><ma...@yahoo-inc.com>
> <evans@yahoo-inc.com%3E%3Cmailto:evans@yahoo-inc.com%3E>>, Narendra
> Bidari <Narendra_Bidari@symantec.com<
> mailto:Narendra_Bidari@symantec.com><ma...@symantec.com>
> <Narendra_Bidari@symantec.com%3E%3Cmailto:Narendra_Bidari@symantec.com%3E>
> >
>
> Subject: Re: Storm's Kafka spout is not reading latest data even with new
> consumer group
>
> does setting kafkaConfig.startOffsetTime = OffsetRequest.LatestTime() not
> work?
>
> On Sun, May 29, 2016 at 7:56 PM, Sachin Pasalkar <
> Sachin_Pasalkar@symantec.com<
> mailto:Sachin_Pasalkar@symantec.com><ma...@symantec.com>
> <Sachin_Pasalkar@symantec.com%3E%3Cmailto:Sachin_Pasalkar@symantec.com%3E>>
> wrote:
>
> I looked at discussion thread. It looks like user did this changes so new
> consumer will start reading data from earliest offset rather than latest.
> They haven’t consider below case as well if the there is changes in data &
> user forgot to clear old data from kafka topic it will cause mess (If user
> start with new consumer user will expect to read it from latest OR he can
> set offset explicitly) Setting to earliest is more error prone in PROD.
>
> Thoughts?
>
> From: Sachin Pasalkar <sachin_pasalkar@symantec.com<
> mailto:sachin_pasalkar@symantec.com><ma...@symantec.com><mailto
> <sachin_pasalkar@symantec.com%3E%3Cmailto:sachin_pasalkar@symantec.com%3E%3Cmailto>
> :
> sachin_pasalkar@symantec.com<
> mailto:sachin_pasalkar@symantec.com><ma...@symantec.com>>
> <sachin_pasalkar@symantec.com%3E%3Cmailto:sachin_pasalkar@symantec.com%3E%3E>
> >
> Reply-To: "dev@storm.apache.org<
> mailto:dev@storm.apache.org><ma...@storm.apache.org><mailto:dev@storm.apache.org%3E>
> <dev@storm.apache.org%3E%3Cmailto:dev@storm.apache.org%3E%3Cmailto:dev@storm.apache.org%3E%3Cmailto:dev@storm.apache.org%3E%3E>"
> <
> dev@storm.apache.org<
> mailto:dev@storm.apache.org><ma...@storm.apache.org><mailto:dev@storm.apache.org%3E>
> <dev@storm.apache.org%3E%3Cmailto:dev@storm.apache.org%3E%3Cmailto:dev@storm.apache.org%3E%3Cmailto:dev@storm.apache.org%3E%3E>
> >
> Date: Saturday, 28 May 2016 5:12 pm
> To: "dev@storm.apache.org<
> mailto:dev@storm.apache.org><ma...@storm.apache.org><mailto:dev@storm.apache.org%3E>
> <dev@storm.apache.org%3E%3Cmailto:dev@storm.apache.org%3E%3Cmailto:dev@storm.apache.org%3E%3Cmailto:dev@storm.apache.org%3E%3E>"
> <
> dev@storm.apache.org<
> mailto:dev@storm.apache.org><ma...@storm.apache.org><mailto:dev@storm.apache.org%3E>
> <dev@storm.apache.org%3E%3Cmailto:dev@storm.apache.org%3E%3Cmailto:dev@storm.apache.org%3E%3Cmailto:dev@storm.apache.org%3E%3E>>,
> Bobby Evans <
> evans@yahoo-inc.com<
> mailto:evans@yahoo-inc.com><ma...@yahoo-inc.com><mailto:evans@yahoo-inc.com%3E>
> <evans@yahoo-inc.com%3E%3Cmailto:evans@yahoo-inc.com%3E%3Cmailto:evans@yahoo-inc.com%3E%3Cmailto:evans@yahoo-inc.com%3E%3E>
> >
> Cc: Narendra Bidari <Narendra_Bidari@symantec.com<
> mailto:Narendra_Bidari@symantec.com><ma...@symantec.com><mailto
> <Narendra_Bidari@symantec.com%3E%3Cmailto:Narendra_Bidari@symantec.com%3E%3Cmailto>
> :
> Narendra_Bidari@symantec.com<
> mailto:Narendra_Bidari@symantec.com><ma...@symantec.com>>
> <Narendra_Bidari@symantec.com%3E%3Cmailto:Narendra_Bidari@symantec.com%3E%3E>
> >
> Subject: Re: Storm's Kafka spout is not reading latest data even with new
> consumer group
>
> Thanks Bobby. I will ask on ticket.
>
> From: Bobby Evans <evans@yahoo-inc.com.INVALID<
> mailto:evans@yahoo-inc.com.INVALID><ma...@yahoo-inc.com.INVALID><mailto
> <evans@yahoo-inc.com.INVALID%3E%3Cmailto:evans@yahoo-inc.com.INVALID%3E%3Cmailto>
> :
> evans@yahoo-inc.com.INVALID<
> mailto:evans@yahoo-inc.com.INVALID><ma...@yahoo-inc.com.INVALID><mailto:evans@yahoo-inc.com.INVALID%3E>
> <evans@yahoo-inc.com.INVALID%3E%3Cmailto:evans@yahoo-inc.com.INVALID%3E%3E%3Cmailto:evans@yahoo-inc.com.INVALID%3E%3Cmailto:evans@yahoo-inc.com.INVALID%3E%3E>
> >
> Reply-To: "dev@storm.apache.org<
> mailto:dev@storm.apache.org><ma...@storm.apache.org><mailto<mailto:dev@storm.apache.org%3E%3Cmailto>
> <dev@storm.apache.org%3E%3Cmailto:dev@storm.apache.org%3E%3Cmailto:dev@storm.apache.org%3E%3Cmailto%3Cmailto:dev@storm.apache.org%3E%3Cmailto%3E>
> :
> dev@storm.apache.org<
> mailto:dev@storm.apache.org><ma...@storm.apache.org>>
> <dev@storm.apache.org%3E%3Cmailto:dev@storm.apache.org%3E%3E>" <
> dev@storm.apache.org<
> mailto:dev@storm.apache.org><ma...@storm.apache.org><mailto:dev@storm.apache.org
> <de...@storm.apache.org>
> <ma...@storm.apache.org><mailto:dev@storm.apache.org%3E>
> <dev@storm.apache.org%3E%3Cmailto:dev@storm.apache.org%3E%3E>>, Bobby
> Evans <evans@yahoo-inc.com<
> mailto:evans@yahoo-inc.com><ma...@yahoo-inc.com><mailto
> <evans@yahoo-inc.com%3E%3Cmailto:evans@yahoo-inc.com%3E%3Cmailto>:
> evans@yahoo-inc.com<
> mailto:evans@yahoo-inc.com><ma...@yahoo-inc.com><mailto:evans@yahoo-inc.com%3E>
> <evans@yahoo-inc.com%3E%3Cmailto:evans@yahoo-inc.com%3E%3E%3Cmailto:evans@yahoo-inc.com%3E%3Cmailto:evans@yahoo-inc.com%3E%3E>
> >
> Date: Friday, 27 May 2016 7:45 pm
> To: "dev@storm.apache.org<
> mailto:dev@storm.apache.org><ma...@storm.apache.org><mailto<mailto:dev@storm.apache.org%3E%3Cmailto>
> <dev@storm.apache.org%3E%3Cmailto:dev@storm.apache.org%3E%3Cmailto:dev@storm.apache.org%3E%3Cmailto%3Cmailto:dev@storm.apache.org%3E%3Cmailto%3E>
> :
> dev@storm.apache.org<
> mailto:dev@storm.apache.org><ma...@storm.apache.org>>
> <dev@storm.apache.org%3E%3Cmailto:dev@storm.apache.org%3E%3E>" <
> dev@storm.apache.org<
> mailto:dev@storm.apache.org><ma...@storm.apache.org><mailto:dev@storm.apache.org
> <de...@storm.apache.org>
> <ma...@storm.apache.org><mailto:dev@storm.apache.org%3E>
> <dev@storm.apache.org%3E%3Cmailto:dev@storm.apache.org%3E%3E>>
> Cc: Narendra Bidari <Narendra_Bidari@symantec.com<
> mailto:Narendra_Bidari@symantec.com><ma...@symantec.com><mailto
> <Narendra_Bidari@symantec.com%3E%3Cmailto:Narendra_Bidari@symantec.com%3E%3Cmailto>
> :
> Narendra_Bidari@symantec.com<
> mailto:Narendra_Bidari@symantec.com><ma...@symantec.com><mailto:Narendra_Bidari@symantec.com%3E>
> <Narendra_Bidari@symantec.com%3E%3Cmailto:Narendra_Bidari@symantec.com%3E%3E%3Cmailto:Narendra_Bidari@symantec.com%3E%3Cmailto:Narendra_Bidari@symantec.com%3E%3E>
> >
> Subject: Re: Storm's Kafka spout is not reading latest data even with new
> consumer group
>
> Looks like it changed as a part of
> https://issues.apache.org/jira/browse/STORM-563.  That might be a good
> place to ask.
> Specifically it was pull request https://github.com/apache/storm/pull/493.
> To me it looks like the code was updated to use ignoreZKOffsets instead of
> forceFromStart, but I have not dug into the exact details of the change to
> know what all the ramifications might have been.
> - Bobby
>
>      On Thursday, May 26, 2016 10:13 PM, Sachin Pasalkar <
> Sachin_Pasalkar@symantec.com<
> mailto:Sachin_Pasalkar@symantec.com><ma...@symantec.com><mailto<mailto:Sachin_Pasalkar@symantec.com%3E%3Cmailto>
> <Sachin_Pasalkar@symantec.com%3E%3Cmailto:Sachin_Pasalkar@symantec.com%3E%3Cmailto:Sachin_Pasalkar@symantec.com%3E%3Cmailto%3Cmailto:Sachin_Pasalkar@symantec.com%3E%3Cmailto%3E>
> :
> Sachin_Pasalkar@symantec.com<
> mailto:Sachin_Pasalkar@symantec.com><ma...@symantec.com>>
> <Sachin_Pasalkar@symantec.com%3E%3Cmailto:Sachin_Pasalkar@symantec.com%3E%3E>>
> wrote:
>
> Can you look at this please?
>
> From: Sachin Pasalkar <sachin_pasalkar@symantec.com<
> mailto:sachin_pasalkar@symantec.com><ma...@symantec.com><mailto
> <sachin_pasalkar@symantec.com%3E%3Cmailto:sachin_pasalkar@symantec.com%3E%3Cmailto>
> :
> sachin_pasalkar@symantec.com<
> mailto:sachin_pasalkar@symantec.com><ma...@symantec.com><mailto<mailto:sachin_pasalkar@symantec.com%3E%3Cmailto
> <sachin_pasalkar@symantec.com%3E%3Cmailto:sachin_pasalkar@symantec.com%3E%3E%3Cmailto:sachin_pasalkar@symantec.com%3E%3Cmailto%3Cmailto:sachin_pasalkar@symantec.com%3E%3Cmailto>
> >:
> sachin_pasalkar@symantec.com<
> mailto:sachin_pasalkar@symantec.com><ma...@symantec.com>>
> <sachin_pasalkar@symantec.com%3E%3Cmailto:sachin_pasalkar@symantec.com%3E%3E>
> >
> Reply-To: "dev@storm.apache.org<
> mailto:dev@storm.apache.org><ma...@storm.apache.org><mailto<mailto:dev@storm.apache.org%3E%3Cmailto>
> <dev@storm.apache.org%3E%3Cmailto:dev@storm.apache.org%3E%3Cmailto:dev@storm.apache.org%3E%3Cmailto%3Cmailto:dev@storm.apache.org%3E%3Cmailto%3E>
> :
> dev@storm.apache.org<
> mailto:dev@storm.apache.org><ma...@storm.apache.org><mailto:dev@storm.apache.org%3E>
> <dev@storm.apache.org%3E%3Cmailto:dev@storm.apache.org%3E%3E%3Cmailto:dev@storm.apache.org%3E%3Cmailto:dev@storm.apache.org%3E%3E>"
> <dev@storm.apache.org<
> mailto:dev@storm.apache.org><mailto:dev@storm.apache.org
> <de...@storm.apache.org>>
> <
> mailto:dev@storm.apache.org><ma...@storm.apache.org><mailto<mailto:dev@storm.apache.org%3E%3Cmailto:dev@storm.apache.org%3E%3Cmailto>
> <dev@storm.apache.org%3E%3Cmailto:dev@storm.apache.org%3E%3Cmailto%3Cmailto:dev@storm.apache.org%3E%3Cmailto:dev@storm.apache.org%3E%3Cmailto%3E>
> :
> dev@storm.apache.org<
> mailto:dev@storm.apache.org><ma...@storm.apache.org>>
> <dev@storm.apache.org%3E%3Cmailto:dev@storm.apache.org%3E%3E>>
> Date: Thursday, 26 May 2016 9:35 pm
> To: "dev@storm.apache.org<
> mailto:dev@storm.apache.org><ma...@storm.apache.org><mailto<mailto:dev@storm.apache.org%3E%3Cmailto>
> <dev@storm.apache.org%3E%3Cmailto:dev@storm.apache.org%3E%3Cmailto:dev@storm.apache.org%3E%3Cmailto%3Cmailto:dev@storm.apache.org%3E%3Cmailto%3E>
> :
> dev@storm.apache.org<
> mailto:dev@storm.apache.org><ma...@storm.apache.org><mailto:dev@storm.apache.org%3E>
> <dev@storm.apache.org%3E%3Cmailto:dev@storm.apache.org%3E%3E%3Cmailto:dev@storm.apache.org%3E%3Cmailto:dev@storm.apache.org%3E%3E>"
> <dev@storm.apache.org<
> mailto:dev@storm.apache.org><mailto:dev@storm.apache.org
> <de...@storm.apache.org>>
> <
> mailto:dev@storm.apache.org><ma...@storm.apache.org><mailto<mailto:dev@storm.apache.org%3E%3Cmailto:dev@storm.apache.org%3E%3Cmailto>
> <dev@storm.apache.org%3E%3Cmailto:dev@storm.apache.org%3E%3Cmailto%3Cmailto:dev@storm.apache.org%3E%3Cmailto:dev@storm.apache.org%3E%3Cmailto%3E>
> :
> dev@storm.apache.org<
> mailto:dev@storm.apache.org><ma...@storm.apache.org>>
> <dev@storm.apache.org%3E%3Cmailto:dev@storm.apache.org%3E%3E>>
> Cc: Narendra Bidari <Narendra_Bidari@symantec.com<
> mailto:Narendra_Bidari@symantec.com><ma...@symantec.com><mailto
> <Narendra_Bidari@symantec.com%3E%3Cmailto:Narendra_Bidari@symantec.com%3E%3Cmailto>
> :
> Narendra_Bidari@symantec.com<
> mailto:Narendra_Bidari@symantec.com><ma...@symantec.com><mailto<mailto:Narendra_Bidari@symantec.com%3E%3Cmailto
> <Narendra_Bidari@symantec.com%3E%3Cmailto:Narendra_Bidari@symantec.com%3E%3E%3Cmailto:Narendra_Bidari@symantec.com%3E%3Cmailto%3Cmailto:Narendra_Bidari@symantec.com%3E%3Cmailto>
> >:
> Narendra_Bidari@symantec.com<
> mailto:Narendra_Bidari@symantec.com><ma...@symantec.com>>
> <Narendra_Bidari@symantec.com%3E%3Cmailto:Narendra_Bidari@symantec.com%3E%3E>
> >
> Subject: Re: Storm's Kafka spout is not reading latest data even with new
> consumer group
>
> Can anyone look at this?
>
> From: Sachin Pasalkar <sachin_pasalkar@symantec.com<
> mailto:sachin_pasalkar@symantec.com><ma...@symantec.com><mailto
> <sachin_pasalkar@symantec.com%3E%3Cmailto:sachin_pasalkar@symantec.com%3E%3Cmailto>
> :
> sachin_pasalkar@symantec.com<
> mailto:sachin_pasalkar@symantec.com><ma...@symantec.com><mailto<mailto:sachin_pasalkar@symantec.com%3E%3Cmailto
> <sachin_pasalkar@symantec.com%3E%3Cmailto:sachin_pasalkar@symantec.com%3E%3E%3Cmailto:sachin_pasalkar@symantec.com%3E%3Cmailto%3Cmailto:sachin_pasalkar@symantec.com%3E%3Cmailto>
> >:
> sachin_pasalkar@symantec.com<
> mailto:sachin_pasalkar@symantec.com><ma...@symantec.com><mailto:sachin_pasalkar@symantec.com%3E>
> <sachin_pasalkar@symantec.com%3E%3Cmailto:sachin_pasalkar@symantec.com%3E%3E%3Cmailto:sachin_pasalkar@symantec.com%3E%3Cmailto:sachin_pasalkar@symantec.com%3E%3E>
> >
> Reply-To: "dev@storm.apache.org<
> mailto:dev@storm.apache.org><ma...@storm.apache.org><mailto<mailto:dev@storm.apache.org%3E%3Cmailto>
> <dev@storm.apache.org%3E%3Cmailto:dev@storm.apache.org%3E%3Cmailto:dev@storm.apache.org%3E%3Cmailto%3Cmailto:dev@storm.apache.org%3E%3Cmailto%3E>
> :
> dev@storm.apache.org<
> mailto:dev@storm.apache.org><ma...@storm.apache.org><mailto<mailto:dev@storm.apache.org%3E%3Cmailto
> <dev@storm.apache.org%3E%3Cmailto:dev@storm.apache.org%3E%3E%3Cmailto:dev@storm.apache.org%3E%3Cmailto%3Cmailto:dev@storm.apache.org%3E%3Cmailto>
> >:
> dev@storm.apache.org<
> mailto:dev@storm.apache.org><ma...@storm.apache.org>>
> <dev@storm.apache.org%3E%3Cmailto:dev@storm.apache.org%3E%3E>" <
> dev@storm.apache.org<
> mailto:dev@storm.apache.org><ma...@storm.apache.org><mailto:dev@storm.apache.org
> <de...@storm.apache.org>
> <
> mailto:dev@storm.apache.org><ma...@storm.apache.org><mailto<mailto:dev@storm.apache.org%3E%3Cmailto:dev@storm.apache.org%3E%3Cmailto>
> <dev@storm.apache.org%3E%3Cmailto:dev@storm.apache.org%3E%3Cmailto%3Cmailto:dev@storm.apache.org%3E%3Cmailto:dev@storm.apache.org%3E%3Cmailto%3E>
> :
> dev@storm.apache.org<
> mailto:dev@storm.apache.org><ma...@storm.apache.org>>
> <dev@storm.apache.org%3E%3Cmailto:dev@storm.apache.org%3E%3E>>
> Date: Thursday, 26 May 2016 1:18 pm
> To: "dev@storm.apache.org<
> mailto:dev@storm.apache.org><ma...@storm.apache.org><mailto<mailto:dev@storm.apache.org%3E%3Cmailto>
> <dev@storm.apache.org%3E%3Cmailto:dev@storm.apache.org%3E%3Cmailto:dev@storm.apache.org%3E%3Cmailto%3Cmailto:dev@storm.apache.org%3E%3Cmailto%3E>
> :
> dev@storm.apache.org<
> mailto:dev@storm.apache.org><ma...@storm.apache.org><mailto<mailto:dev@storm.apache.org%3E%3Cmailto
> <dev@storm.apache.org%3E%3Cmailto:dev@storm.apache.org%3E%3E%3Cmailto:dev@storm.apache.org%3E%3Cmailto%3Cmailto:dev@storm.apache.org%3E%3Cmailto>
> >:
> dev@storm.apache.org<
> mailto:dev@storm.apache.org><ma...@storm.apache.org>>
> <dev@storm.apache.org%3E%3Cmailto:dev@storm.apache.org%3E%3E>" <
> dev@storm.apache.org<
> mailto:dev@storm.apache.org><ma...@storm.apache.org><mailto:dev@storm.apache.org
> <de...@storm.apache.org>
> <
> mailto:dev@storm.apache.org><ma...@storm.apache.org><mailto<mailto:dev@storm.apache.org%3E%3Cmailto:dev@storm.apache.org%3E%3Cmailto>
> <dev@storm.apache.org%3E%3Cmailto:dev@storm.apache.org%3E%3Cmailto%3Cmailto:dev@storm.apache.org%3E%3Cmailto:dev@storm.apache.org%3E%3Cmailto%3E>
> :
> dev@storm.apache.org<
> mailto:dev@storm.apache.org><ma...@storm.apache.org>>
> <dev@storm.apache.org%3E%3Cmailto:dev@storm.apache.org%3E%3E>>
> Cc: Narendra Bidari <Narendra_Bidari@symantec.com<
> mailto:Narendra_Bidari@symantec.com><ma...@symantec.com><mailto
> <Narendra_Bidari@symantec.com%3E%3Cmailto:Narendra_Bidari@symantec.com%3E%3Cmailto>
> :
> Narendra_Bidari@symantec.com<
> mailto:Narendra_Bidari@symantec.com><ma...@symantec.com><mailto<mailto:Narendra_Bidari@symantec.com%3E%3Cmailto
> <Narendra_Bidari@symantec.com%3E%3Cmailto:Narendra_Bidari@symantec.com%3E%3E%3Cmailto:Narendra_Bidari@symantec.com%3E%3Cmailto%3Cmailto:Narendra_Bidari@symantec.com%3E%3Cmailto>
> >:
> Narendra_Bidari@symantec.com<
> mailto:Narendra_Bidari@symantec.com><ma...@symantec.com><mailto:Narendra_Bidari@symantec.com%3E>
> <Narendra_Bidari@symantec.com%3E%3Cmailto:Narendra_Bidari@symantec.com%3E%3E%3Cmailto:Narendra_Bidari@symantec.com%3E%3Cmailto:Narendra_Bidari@symantec.com%3E%3E>
> >
> Subject: Storm's Kafka spout is not reading latest data even with new
> consumer group
>
> Currently if you give the latest consumer group it starts reading data
> from earliest offset rather than latest
>
> In KafkaConfig
>
> public long startOffsetTime = OffsetRequest.EarliestTime();
>
>
> In doEmitNewPartitionBatch API of storm.kafka.trident.TridentKafkaEmitter,
> if consumer group is null calls goes to 109 line
>
>    if (lastMeta != null) {
>
> /*  98 */      String lastInstanceId = null;
>
> /*  99 */      Map lastTopoMeta = (Map)lastMeta.get("topology");
>
> /* 100 */      if (lastTopoMeta != null)
>
> /* 101 */        lastInstanceId = (String)lastTopoMeta.get("id");
>
> /*    */      long offset;
>
> /* 103 */      if ((_config.ignoreZkOffsets) &&
> (!_topologyInstanceId.equals(lastInstanceId))) {
>
> /* 104 */        offset = KafkaUtils.getOffset(consumer, _config.topic,
> partition, _config.startOffsetTime);
>
> /*    */      } else {
>
> /* 106 */        offset = ((Long)lastMeta.get("nextOffset")).longValue();
>
> /*    */      }
>
> /*    */    } else {
>
> /* 109 */      offset = KafkaUtils.getOffset(consumer, _config.topic,
> partition, _config);
>
> /*    */    }
>
> Which calls below API. As you can see this call will fetch earliest data
> rather than fetching latest
>
> public static long getOffset(SimpleConsumer consumer, String topic, int
> partition, KafkaConfig config)
>
>
> {
>
>
>      long startOffsetTime = config.startOffsetTime;
>
>
>      return getOffset(consumer, topic, partition, startOffsetTime);
>
>
>
>
>
>
>
>
> }
>
>
>
> How it should be (It was there in previous release 0.9.x)
>
> public static long getOffset(SimpleConsumer consumer, String topic, int
> partition, KafkaConfig config) {
>
>
>          long startOffsetTime = kafka.api.OffsetRequest.LatestTime();
>
>
>          if ( config.ignoreZkOffsets) {
>
>
>              startOffsetTime = config.startOffsetTime;
>
>
>          }
>
>
>          return getOffset(consumer, topic, partition, startOffsetTime);
>
>
>      }
>
>
>
> This code was earlier present but somehow it got removed. I tried to
> search on github but didn't found history of change.
>
> Thanks,
>
> Sachin
>
>
>
>
>
>
>
>
> --
> Regards,
> Abhishek Agarwal
>
>
>
>
> --
> Regards,
> Abhishek Agarwal
>
>
>
>


-- 
Regards,
Abhishek Agarwal

Re: Storm's Kafka spout is not reading latest data even with new consumer group

Posted by Sachin Pasalkar <Sa...@symantec.com>.
Does this make sense? I believe we shouldn’t  ask dev/devops to add/remember configuration. It should work by default by what it means e.g. IgnorezkOffset is true then should read from start of segment, if consumer is new it should read from latest. User shouldn’t has to mention any config to get it work.

From: Sachin Pasalkar <sa...@symantec.com>>
Reply-To: "dev@storm.apache.org<ma...@storm.apache.org>" <de...@storm.apache.org>>
Date: Monday, 30 May 2016 12:00 pm
To: "dev@storm.apache.org<ma...@storm.apache.org>" <de...@storm.apache.org>>, Abhishek Agarwal <ab...@gmail.com>>
Cc: Bobby Evans <ev...@yahoo-inc.com>>, Narendra Bidari <Na...@symantec.com>>
Subject: Re: Storm's Kafka spout is not reading latest data even with new consumer group

What I am suggesting was there in 0.9.x version which got removed with changing variable name from forceFromStart to ignoreZkOffsets

From: Sachin Pasalkar <sa...@symantec.com>>
Reply-To: "dev@storm.apache.org<ma...@storm.apache.org>" <de...@storm.apache.org>>
Date: Monday, 30 May 2016 11:44 am
To: Abhishek Agarwal <ab...@gmail.com>>
Cc: "dev@storm.apache.org<ma...@storm.apache.org>" <de...@storm.apache.org>>, Bobby Evans <ev...@yahoo-inc.com>>, Narendra Bidari <Na...@symantec.com>>
Subject: Re: Storm's Kafka spout is not reading latest data even with new consumer group

Mostly in PROD ignoreZkOffsets is false. However in testing user may want to read data from start of segment (May be to debug OR any other purpose). What I am talking about is less config management perspective. If user sets ignoreZkOffsets to true by default it will read from start of segment otherwise user can have specified config to read from config.startOffsetTime.

From: Abhishek Agarwal <ab...@gmail.com>>
Date: Monday, 30 May 2016 10:53 am
To: Sachin Pasalkar <sa...@symantec.com>>
Cc: "dev@storm.apache.org<ma...@storm.apache.org>" <de...@storm.apache.org>>, Bobby Evans <ev...@yahoo-inc.com>>, Narendra Bidari <Na...@symantec.com>>
Subject: Re: Storm's Kafka spout is not reading latest data even with new consumer group

how about setting ignoreZkOffsets to false? config.startOffsetTime indicates which offset to start from if there is no prior information about the offsets. if ignoreZkOffsets is true, it is intended to read from what is specified in configuration.

On Mon, May 30, 2016 at 9:35 AM, Sachin Pasalkar <Sa...@symantec.com>> wrote:
Not really. If you look at storm.kafka.trident.TridentKafkaEmitter code (doEmitNewPartitionBatch API). Setting time to latest will loose impact of ignoreZkOffsets. If ignoreZkOffsets is set to true it will still read from latest which user will not expect to happen.


/* 103 */       if ((_config.ignoreZkOffsets) && (!_topologyInstanceId.equals(lastInstanceId))) {

/* 104 */         offset = KafkaUtils.getOffset(consumer, _config.topic, partition, _config.startOffsetTime);

/*     */       } else {

/* 106 */         offset = ((Long)lastMeta.get("nextOffset")).longValue();

/*     */       }

/*     */     } else {

/* 109 */       offset = KafkaUtils.getOffset(consumer, _config.topic, partition, _config);

/*     */     }

From: Abhishek Agarwal <ab...@gmail.com>>
Reply-To: "dev@storm.apache.org<ma...@storm.apache.org>" <de...@storm.apache.org>>
Date: Sunday, 29 May 2016 8:32 pm
To: "dev@storm.apache.org<ma...@storm.apache.org>" <de...@storm.apache.org>>
Cc: Bobby Evans <ev...@yahoo-inc.com>>, Narendra Bidari <Na...@symantec.com>>

Subject: Re: Storm's Kafka spout is not reading latest data even with new consumer group

does setting kafkaConfig.startOffsetTime = OffsetRequest.LatestTime() not
work?

On Sun, May 29, 2016 at 7:56 PM, Sachin Pasalkar <
Sachin_Pasalkar@symantec.com<ma...@symantec.com>> wrote:

I looked at discussion thread. It looks like user did this changes so new
consumer will start reading data from earliest offset rather than latest.
They haven’t consider below case as well if the there is changes in data &
user forgot to clear old data from kafka topic it will cause mess (If user
start with new consumer user will expect to read it from latest OR he can
set offset explicitly) Setting to earliest is more error prone in PROD.

Thoughts?

From: Sachin Pasalkar <sa...@symantec.com><mailto:
sachin_pasalkar@symantec.com<ma...@symantec.com>>>
Reply-To: "dev@storm.apache.org<ma...@storm.apache.org><mailto:dev@storm.apache.org%3E>" <
dev@storm.apache.org<ma...@storm.apache.org><mailto:dev@storm.apache.org%3E>>
Date: Saturday, 28 May 2016 5:12 pm
To: "dev@storm.apache.org<ma...@storm.apache.org><mailto:dev@storm.apache.org%3E>" <
dev@storm.apache.org<ma...@storm.apache.org><mailto:dev@storm.apache.org%3E>>, Bobby Evans <
evans@yahoo-inc.com<ma...@yahoo-inc.com><mailto:evans@yahoo-inc.com%3E>>
Cc: Narendra Bidari <Na...@symantec.com><mailto:
Narendra_Bidari@symantec.com<ma...@symantec.com>>>
Subject: Re: Storm's Kafka spout is not reading latest data even with new
consumer group

Thanks Bobby. I will ask on ticket.

From: Bobby Evans <ev...@yahoo-inc.com.INVALID><mailto:
evans@yahoo-inc.com.INVALID<ma...@yahoo-inc.com.INVALID><mailto:evans@yahoo-inc.com.INVALID%3E>>
Reply-To: "dev@storm.apache.org<ma...@storm.apache.org><mailto<mailto:dev@storm.apache.org%3E%3Cmailto>:
dev@storm.apache.org<ma...@storm.apache.org>>" <de...@storm.apache.org><mailto:dev@storm.apache.org
<ma...@storm.apache.org><mailto:dev@storm.apache.org%3E>>, Bobby Evans <ev...@yahoo-inc.com><mailto:
evans@yahoo-inc.com<ma...@yahoo-inc.com><mailto:evans@yahoo-inc.com%3E>>
Date: Friday, 27 May 2016 7:45 pm
To: "dev@storm.apache.org<ma...@storm.apache.org><mailto<mailto:dev@storm.apache.org%3E%3Cmailto>:
dev@storm.apache.org<ma...@storm.apache.org>>" <de...@storm.apache.org><mailto:dev@storm.apache.org
<ma...@storm.apache.org><mailto:dev@storm.apache.org%3E>>
Cc: Narendra Bidari <Na...@symantec.com><mailto:
Narendra_Bidari@symantec.com<ma...@symantec.com><mailto:Narendra_Bidari@symantec.com%3E>>
Subject: Re: Storm's Kafka spout is not reading latest data even with new
consumer group

Looks like it changed as a part of
https://issues.apache.org/jira/browse/STORM-563.  That might be a good
place to ask.
Specifically it was pull request https://github.com/apache/storm/pull/493.
To me it looks like the code was updated to use ignoreZKOffsets instead of
forceFromStart, but I have not dug into the exact details of the change to
know what all the ramifications might have been.
- Bobby

     On Thursday, May 26, 2016 10:13 PM, Sachin Pasalkar <
Sachin_Pasalkar@symantec.com<ma...@symantec.com><mailto<mailto:Sachin_Pasalkar@symantec.com%3E%3Cmailto>:
Sachin_Pasalkar@symantec.com<ma...@symantec.com>>> wrote:

Can you look at this please?

From: Sachin Pasalkar <sa...@symantec.com><mailto:
sachin_pasalkar@symantec.com<ma...@symantec.com><mailto<mailto:sachin_pasalkar@symantec.com%3E%3Cmailto>:
sachin_pasalkar@symantec.com<ma...@symantec.com>>>
Reply-To: "dev@storm.apache.org<ma...@storm.apache.org><mailto<mailto:dev@storm.apache.org%3E%3Cmailto>:
dev@storm.apache.org<ma...@storm.apache.org><mailto:dev@storm.apache.org%3E>" <de...@storm.apache.org>
<ma...@storm.apache.org><mailto<mailto:dev@storm.apache.org%3E%3Cmailto:dev@storm.apache.org%3E%3Cmailto>:
dev@storm.apache.org<ma...@storm.apache.org>>>
Date: Thursday, 26 May 2016 9:35 pm
To: "dev@storm.apache.org<ma...@storm.apache.org><mailto<mailto:dev@storm.apache.org%3E%3Cmailto>:
dev@storm.apache.org<ma...@storm.apache.org><mailto:dev@storm.apache.org%3E>" <de...@storm.apache.org>
<ma...@storm.apache.org><mailto<mailto:dev@storm.apache.org%3E%3Cmailto:dev@storm.apache.org%3E%3Cmailto>:
dev@storm.apache.org<ma...@storm.apache.org>>>
Cc: Narendra Bidari <Na...@symantec.com><mailto:
Narendra_Bidari@symantec.com<ma...@symantec.com><mailto<mailto:Narendra_Bidari@symantec.com%3E%3Cmailto>:
Narendra_Bidari@symantec.com<ma...@symantec.com>>>
Subject: Re: Storm's Kafka spout is not reading latest data even with new
consumer group

Can anyone look at this?

From: Sachin Pasalkar <sa...@symantec.com><mailto:
sachin_pasalkar@symantec.com<ma...@symantec.com><mailto<mailto:sachin_pasalkar@symantec.com%3E%3Cmailto>:
sachin_pasalkar@symantec.com<ma...@symantec.com><mailto:sachin_pasalkar@symantec.com%3E>>
Reply-To: "dev@storm.apache.org<ma...@storm.apache.org><mailto<mailto:dev@storm.apache.org%3E%3Cmailto>:
dev@storm.apache.org<ma...@storm.apache.org><mailto<mailto:dev@storm.apache.org%3E%3Cmailto>:
dev@storm.apache.org<ma...@storm.apache.org>>" <de...@storm.apache.org><mailto:dev@storm.apache.org
<ma...@storm.apache.org><mailto<mailto:dev@storm.apache.org%3E%3Cmailto:dev@storm.apache.org%3E%3Cmailto>:
dev@storm.apache.org<ma...@storm.apache.org>>>
Date: Thursday, 26 May 2016 1:18 pm
To: "dev@storm.apache.org<ma...@storm.apache.org><mailto<mailto:dev@storm.apache.org%3E%3Cmailto>:
dev@storm.apache.org<ma...@storm.apache.org><mailto<mailto:dev@storm.apache.org%3E%3Cmailto>:
dev@storm.apache.org<ma...@storm.apache.org>>" <de...@storm.apache.org><mailto:dev@storm.apache.org
<ma...@storm.apache.org><mailto<mailto:dev@storm.apache.org%3E%3Cmailto:dev@storm.apache.org%3E%3Cmailto>:
dev@storm.apache.org<ma...@storm.apache.org>>>
Cc: Narendra Bidari <Na...@symantec.com><mailto:
Narendra_Bidari@symantec.com<ma...@symantec.com><mailto<mailto:Narendra_Bidari@symantec.com%3E%3Cmailto>:
Narendra_Bidari@symantec.com<ma...@symantec.com><mailto:Narendra_Bidari@symantec.com%3E>>
Subject: Storm's Kafka spout is not reading latest data even with new
consumer group

Currently if you give the latest consumer group it starts reading data
from earliest offset rather than latest

In KafkaConfig

public long startOffsetTime = OffsetRequest.EarliestTime();


In doEmitNewPartitionBatch API of storm.kafka.trident.TridentKafkaEmitter,
if consumer group is null calls goes to 109 line

   if (lastMeta != null) {

/*  98 */      String lastInstanceId = null;

/*  99 */      Map lastTopoMeta = (Map)lastMeta.get("topology");

/* 100 */      if (lastTopoMeta != null)

/* 101 */        lastInstanceId = (String)lastTopoMeta.get("id");

/*    */      long offset;

/* 103 */      if ((_config.ignoreZkOffsets) &&
(!_topologyInstanceId.equals(lastInstanceId))) {

/* 104 */        offset = KafkaUtils.getOffset(consumer, _config.topic,
partition, _config.startOffsetTime);

/*    */      } else {

/* 106 */        offset = ((Long)lastMeta.get("nextOffset")).longValue();

/*    */      }

/*    */    } else {

/* 109 */      offset = KafkaUtils.getOffset(consumer, _config.topic,
partition, _config);

/*    */    }

Which calls below API. As you can see this call will fetch earliest data
rather than fetching latest

public static long getOffset(SimpleConsumer consumer, String topic, int
partition, KafkaConfig config)


{


     long startOffsetTime = config.startOffsetTime;


     return getOffset(consumer, topic, partition, startOffsetTime);








}



How it should be (It was there in previous release 0.9.x)

public static long getOffset(SimpleConsumer consumer, String topic, int
partition, KafkaConfig config) {


         long startOffsetTime = kafka.api.OffsetRequest.LatestTime();


         if ( config.ignoreZkOffsets) {


             startOffsetTime = config.startOffsetTime;


         }


         return getOffset(consumer, topic, partition, startOffsetTime);


     }



This code was earlier present but somehow it got removed. I tried to
search on github but didn't found history of change.

Thanks,

Sachin








--
Regards,
Abhishek Agarwal




--
Regards,
Abhishek Agarwal




Re: Storm's Kafka spout is not reading latest data even with new consumer group

Posted by Sachin Pasalkar <Sa...@symantec.com>.
What I am suggesting was there in 0.9.x version which got removed with changing variable name from forceFromStart to ignoreZkOffsets

From: Sachin Pasalkar <sa...@symantec.com>>
Reply-To: "dev@storm.apache.org<ma...@storm.apache.org>" <de...@storm.apache.org>>
Date: Monday, 30 May 2016 11:44 am
To: Abhishek Agarwal <ab...@gmail.com>>
Cc: "dev@storm.apache.org<ma...@storm.apache.org>" <de...@storm.apache.org>>, Bobby Evans <ev...@yahoo-inc.com>>, Narendra Bidari <Na...@symantec.com>>
Subject: Re: Storm's Kafka spout is not reading latest data even with new consumer group

Mostly in PROD ignoreZkOffsets is false. However in testing user may want to read data from start of segment (May be to debug OR any other purpose). What I am talking about is less config management perspective. If user sets ignoreZkOffsets to true by default it will read from start of segment otherwise user can have specified config to read from config.startOffsetTime.

From: Abhishek Agarwal <ab...@gmail.com>>
Date: Monday, 30 May 2016 10:53 am
To: Sachin Pasalkar <sa...@symantec.com>>
Cc: "dev@storm.apache.org<ma...@storm.apache.org>" <de...@storm.apache.org>>, Bobby Evans <ev...@yahoo-inc.com>>, Narendra Bidari <Na...@symantec.com>>
Subject: Re: Storm's Kafka spout is not reading latest data even with new consumer group

how about setting ignoreZkOffsets to false? config.startOffsetTime indicates which offset to start from if there is no prior information about the offsets. if ignoreZkOffsets is true, it is intended to read from what is specified in configuration.

On Mon, May 30, 2016 at 9:35 AM, Sachin Pasalkar <Sa...@symantec.com>> wrote:
Not really. If you look at storm.kafka.trident.TridentKafkaEmitter code (doEmitNewPartitionBatch API). Setting time to latest will loose impact of ignoreZkOffsets. If ignoreZkOffsets is set to true it will still read from latest which user will not expect to happen.


/* 103 */       if ((_config.ignoreZkOffsets) && (!_topologyInstanceId.equals(lastInstanceId))) {

/* 104 */         offset = KafkaUtils.getOffset(consumer, _config.topic, partition, _config.startOffsetTime);

/*     */       } else {

/* 106 */         offset = ((Long)lastMeta.get("nextOffset")).longValue();

/*     */       }

/*     */     } else {

/* 109 */       offset = KafkaUtils.getOffset(consumer, _config.topic, partition, _config);

/*     */     }

From: Abhishek Agarwal <ab...@gmail.com>>
Reply-To: "dev@storm.apache.org<ma...@storm.apache.org>" <de...@storm.apache.org>>
Date: Sunday, 29 May 2016 8:32 pm
To: "dev@storm.apache.org<ma...@storm.apache.org>" <de...@storm.apache.org>>
Cc: Bobby Evans <ev...@yahoo-inc.com>>, Narendra Bidari <Na...@symantec.com>>

Subject: Re: Storm's Kafka spout is not reading latest data even with new consumer group

does setting kafkaConfig.startOffsetTime = OffsetRequest.LatestTime() not
work?

On Sun, May 29, 2016 at 7:56 PM, Sachin Pasalkar <
Sachin_Pasalkar@symantec.com<ma...@symantec.com>> wrote:

I looked at discussion thread. It looks like user did this changes so new
consumer will start reading data from earliest offset rather than latest.
They haven’t consider below case as well if the there is changes in data &
user forgot to clear old data from kafka topic it will cause mess (If user
start with new consumer user will expect to read it from latest OR he can
set offset explicitly) Setting to earliest is more error prone in PROD.

Thoughts?

From: Sachin Pasalkar <sa...@symantec.com><mailto:
sachin_pasalkar@symantec.com<ma...@symantec.com>>>
Reply-To: "dev@storm.apache.org<ma...@storm.apache.org><mailto:dev@storm.apache.org%3E>" <
dev@storm.apache.org<ma...@storm.apache.org><mailto:dev@storm.apache.org%3E>>
Date: Saturday, 28 May 2016 5:12 pm
To: "dev@storm.apache.org<ma...@storm.apache.org><mailto:dev@storm.apache.org%3E>" <
dev@storm.apache.org<ma...@storm.apache.org><mailto:dev@storm.apache.org%3E>>, Bobby Evans <
evans@yahoo-inc.com<ma...@yahoo-inc.com><mailto:evans@yahoo-inc.com%3E>>
Cc: Narendra Bidari <Na...@symantec.com><mailto:
Narendra_Bidari@symantec.com<ma...@symantec.com>>>
Subject: Re: Storm's Kafka spout is not reading latest data even with new
consumer group

Thanks Bobby. I will ask on ticket.

From: Bobby Evans <ev...@yahoo-inc.com.INVALID><mailto:
evans@yahoo-inc.com.INVALID<ma...@yahoo-inc.com.INVALID><mailto:evans@yahoo-inc.com.INVALID%3E>>
Reply-To: "dev@storm.apache.org<ma...@storm.apache.org><mailto<mailto:dev@storm.apache.org%3E%3Cmailto>:
dev@storm.apache.org<ma...@storm.apache.org>>" <de...@storm.apache.org><mailto:dev@storm.apache.org
<ma...@storm.apache.org><mailto:dev@storm.apache.org%3E>>, Bobby Evans <ev...@yahoo-inc.com><mailto:
evans@yahoo-inc.com<ma...@yahoo-inc.com><mailto:evans@yahoo-inc.com%3E>>
Date: Friday, 27 May 2016 7:45 pm
To: "dev@storm.apache.org<ma...@storm.apache.org><mailto<mailto:dev@storm.apache.org%3E%3Cmailto>:
dev@storm.apache.org<ma...@storm.apache.org>>" <de...@storm.apache.org><mailto:dev@storm.apache.org
<ma...@storm.apache.org><mailto:dev@storm.apache.org%3E>>
Cc: Narendra Bidari <Na...@symantec.com><mailto:
Narendra_Bidari@symantec.com<ma...@symantec.com><mailto:Narendra_Bidari@symantec.com%3E>>
Subject: Re: Storm's Kafka spout is not reading latest data even with new
consumer group

Looks like it changed as a part of
https://issues.apache.org/jira/browse/STORM-563.  That might be a good
place to ask.
Specifically it was pull request https://github.com/apache/storm/pull/493.
To me it looks like the code was updated to use ignoreZKOffsets instead of
forceFromStart, but I have not dug into the exact details of the change to
know what all the ramifications might have been.
- Bobby

     On Thursday, May 26, 2016 10:13 PM, Sachin Pasalkar <
Sachin_Pasalkar@symantec.com<ma...@symantec.com><mailto<mailto:Sachin_Pasalkar@symantec.com%3E%3Cmailto>:
Sachin_Pasalkar@symantec.com<ma...@symantec.com>>> wrote:

Can you look at this please?

From: Sachin Pasalkar <sa...@symantec.com><mailto:
sachin_pasalkar@symantec.com<ma...@symantec.com><mailto<mailto:sachin_pasalkar@symantec.com%3E%3Cmailto>:
sachin_pasalkar@symantec.com<ma...@symantec.com>>>
Reply-To: "dev@storm.apache.org<ma...@storm.apache.org><mailto<mailto:dev@storm.apache.org%3E%3Cmailto>:
dev@storm.apache.org<ma...@storm.apache.org><mailto:dev@storm.apache.org%3E>" <de...@storm.apache.org>
<ma...@storm.apache.org><mailto<mailto:dev@storm.apache.org%3E%3Cmailto:dev@storm.apache.org%3E%3Cmailto>:
dev@storm.apache.org<ma...@storm.apache.org>>>
Date: Thursday, 26 May 2016 9:35 pm
To: "dev@storm.apache.org<ma...@storm.apache.org><mailto<mailto:dev@storm.apache.org%3E%3Cmailto>:
dev@storm.apache.org<ma...@storm.apache.org><mailto:dev@storm.apache.org%3E>" <de...@storm.apache.org>
<ma...@storm.apache.org><mailto<mailto:dev@storm.apache.org%3E%3Cmailto:dev@storm.apache.org%3E%3Cmailto>:
dev@storm.apache.org<ma...@storm.apache.org>>>
Cc: Narendra Bidari <Na...@symantec.com><mailto:
Narendra_Bidari@symantec.com<ma...@symantec.com><mailto<mailto:Narendra_Bidari@symantec.com%3E%3Cmailto>:
Narendra_Bidari@symantec.com<ma...@symantec.com>>>
Subject: Re: Storm's Kafka spout is not reading latest data even with new
consumer group

Can anyone look at this?

From: Sachin Pasalkar <sa...@symantec.com><mailto:
sachin_pasalkar@symantec.com<ma...@symantec.com><mailto<mailto:sachin_pasalkar@symantec.com%3E%3Cmailto>:
sachin_pasalkar@symantec.com<ma...@symantec.com><mailto:sachin_pasalkar@symantec.com%3E>>
Reply-To: "dev@storm.apache.org<ma...@storm.apache.org><mailto<mailto:dev@storm.apache.org%3E%3Cmailto>:
dev@storm.apache.org<ma...@storm.apache.org><mailto<mailto:dev@storm.apache.org%3E%3Cmailto>:
dev@storm.apache.org<ma...@storm.apache.org>>" <de...@storm.apache.org><mailto:dev@storm.apache.org
<ma...@storm.apache.org><mailto<mailto:dev@storm.apache.org%3E%3Cmailto:dev@storm.apache.org%3E%3Cmailto>:
dev@storm.apache.org<ma...@storm.apache.org>>>
Date: Thursday, 26 May 2016 1:18 pm
To: "dev@storm.apache.org<ma...@storm.apache.org><mailto<mailto:dev@storm.apache.org%3E%3Cmailto>:
dev@storm.apache.org<ma...@storm.apache.org><mailto<mailto:dev@storm.apache.org%3E%3Cmailto>:
dev@storm.apache.org<ma...@storm.apache.org>>" <de...@storm.apache.org><mailto:dev@storm.apache.org
<ma...@storm.apache.org><mailto<mailto:dev@storm.apache.org%3E%3Cmailto:dev@storm.apache.org%3E%3Cmailto>:
dev@storm.apache.org<ma...@storm.apache.org>>>
Cc: Narendra Bidari <Na...@symantec.com><mailto:
Narendra_Bidari@symantec.com<ma...@symantec.com><mailto<mailto:Narendra_Bidari@symantec.com%3E%3Cmailto>:
Narendra_Bidari@symantec.com<ma...@symantec.com><mailto:Narendra_Bidari@symantec.com%3E>>
Subject: Storm's Kafka spout is not reading latest data even with new
consumer group

Currently if you give the latest consumer group it starts reading data
from earliest offset rather than latest

In KafkaConfig

public long startOffsetTime = OffsetRequest.EarliestTime();


In doEmitNewPartitionBatch API of storm.kafka.trident.TridentKafkaEmitter,
if consumer group is null calls goes to 109 line

   if (lastMeta != null) {

/*  98 */      String lastInstanceId = null;

/*  99 */      Map lastTopoMeta = (Map)lastMeta.get("topology");

/* 100 */      if (lastTopoMeta != null)

/* 101 */        lastInstanceId = (String)lastTopoMeta.get("id");

/*    */      long offset;

/* 103 */      if ((_config.ignoreZkOffsets) &&
(!_topologyInstanceId.equals(lastInstanceId))) {

/* 104 */        offset = KafkaUtils.getOffset(consumer, _config.topic,
partition, _config.startOffsetTime);

/*    */      } else {

/* 106 */        offset = ((Long)lastMeta.get("nextOffset")).longValue();

/*    */      }

/*    */    } else {

/* 109 */      offset = KafkaUtils.getOffset(consumer, _config.topic,
partition, _config);

/*    */    }

Which calls below API. As you can see this call will fetch earliest data
rather than fetching latest

public static long getOffset(SimpleConsumer consumer, String topic, int
partition, KafkaConfig config)


{


     long startOffsetTime = config.startOffsetTime;


     return getOffset(consumer, topic, partition, startOffsetTime);








}



How it should be (It was there in previous release 0.9.x)

public static long getOffset(SimpleConsumer consumer, String topic, int
partition, KafkaConfig config) {


         long startOffsetTime = kafka.api.OffsetRequest.LatestTime();


         if ( config.ignoreZkOffsets) {


             startOffsetTime = config.startOffsetTime;


         }


         return getOffset(consumer, topic, partition, startOffsetTime);


     }



This code was earlier present but somehow it got removed. I tried to
search on github but didn't found history of change.

Thanks,

Sachin








--
Regards,
Abhishek Agarwal




--
Regards,
Abhishek Agarwal



Re: Storm's Kafka spout is not reading latest data even with new consumer group

Posted by Sachin Pasalkar <Sa...@symantec.com>.
Mostly in PROD ignoreZkOffsets is false. However in testing user may want to read data from start of segment (May be to debug OR any other purpose). What I am talking about is less config management perspective. If user sets ignoreZkOffsets to true by default it will read from start of segment otherwise user can have specified config to read from config.startOffsetTime.

From: Abhishek Agarwal <ab...@gmail.com>>
Date: Monday, 30 May 2016 10:53 am
To: Sachin Pasalkar <sa...@symantec.com>>
Cc: "dev@storm.apache.org<ma...@storm.apache.org>" <de...@storm.apache.org>>, Bobby Evans <ev...@yahoo-inc.com>>, Narendra Bidari <Na...@symantec.com>>
Subject: Re: Storm's Kafka spout is not reading latest data even with new consumer group

how about setting ignoreZkOffsets to false? config.startOffsetTime indicates which offset to start from if there is no prior information about the offsets. if ignoreZkOffsets is true, it is intended to read from what is specified in configuration.

On Mon, May 30, 2016 at 9:35 AM, Sachin Pasalkar <Sa...@symantec.com>> wrote:
Not really. If you look at storm.kafka.trident.TridentKafkaEmitter code (doEmitNewPartitionBatch API). Setting time to latest will loose impact of ignoreZkOffsets. If ignoreZkOffsets is set to true it will still read from latest which user will not expect to happen.


/* 103 */       if ((_config.ignoreZkOffsets) && (!_topologyInstanceId.equals(lastInstanceId))) {

/* 104 */         offset = KafkaUtils.getOffset(consumer, _config.topic, partition, _config.startOffsetTime);

/*     */       } else {

/* 106 */         offset = ((Long)lastMeta.get("nextOffset")).longValue();

/*     */       }

/*     */     } else {

/* 109 */       offset = KafkaUtils.getOffset(consumer, _config.topic, partition, _config);

/*     */     }

From: Abhishek Agarwal <ab...@gmail.com>>
Reply-To: "dev@storm.apache.org<ma...@storm.apache.org>" <de...@storm.apache.org>>
Date: Sunday, 29 May 2016 8:32 pm
To: "dev@storm.apache.org<ma...@storm.apache.org>" <de...@storm.apache.org>>
Cc: Bobby Evans <ev...@yahoo-inc.com>>, Narendra Bidari <Na...@symantec.com>>

Subject: Re: Storm's Kafka spout is not reading latest data even with new consumer group

does setting kafkaConfig.startOffsetTime = OffsetRequest.LatestTime() not
work?

On Sun, May 29, 2016 at 7:56 PM, Sachin Pasalkar <
Sachin_Pasalkar@symantec.com<ma...@symantec.com>> wrote:

I looked at discussion thread. It looks like user did this changes so new
consumer will start reading data from earliest offset rather than latest.
They haven’t consider below case as well if the there is changes in data &
user forgot to clear old data from kafka topic it will cause mess (If user
start with new consumer user will expect to read it from latest OR he can
set offset explicitly) Setting to earliest is more error prone in PROD.

Thoughts?

From: Sachin Pasalkar <sa...@symantec.com><mailto:
sachin_pasalkar@symantec.com<ma...@symantec.com>>>
Reply-To: "dev@storm.apache.org<ma...@storm.apache.org><mailto:dev@storm.apache.org%3E>" <
dev@storm.apache.org<ma...@storm.apache.org><mailto:dev@storm.apache.org%3E>>
Date: Saturday, 28 May 2016 5:12 pm
To: "dev@storm.apache.org<ma...@storm.apache.org><mailto:dev@storm.apache.org%3E>" <
dev@storm.apache.org<ma...@storm.apache.org><mailto:dev@storm.apache.org%3E>>, Bobby Evans <
evans@yahoo-inc.com<ma...@yahoo-inc.com><mailto:evans@yahoo-inc.com%3E>>
Cc: Narendra Bidari <Na...@symantec.com><mailto:
Narendra_Bidari@symantec.com<ma...@symantec.com>>>
Subject: Re: Storm's Kafka spout is not reading latest data even with new
consumer group

Thanks Bobby. I will ask on ticket.

From: Bobby Evans <ev...@yahoo-inc.com.INVALID><mailto:
evans@yahoo-inc.com.INVALID<ma...@yahoo-inc.com.INVALID><mailto:evans@yahoo-inc.com.INVALID%3E>>
Reply-To: "dev@storm.apache.org<ma...@storm.apache.org><mailto<mailto:dev@storm.apache.org%3E%3Cmailto>:
dev@storm.apache.org<ma...@storm.apache.org>>" <de...@storm.apache.org><mailto:dev@storm.apache.org
><ma...@storm.apache.org><mailto:dev@storm.apache.org%3E>>, Bobby Evans <ev...@yahoo-inc.com><mailto:
evans@yahoo-inc.com<ma...@yahoo-inc.com><mailto:evans@yahoo-inc.com%3E>>
Date: Friday, 27 May 2016 7:45 pm
To: "dev@storm.apache.org<ma...@storm.apache.org><mailto<mailto:dev@storm.apache.org%3E%3Cmailto>:
dev@storm.apache.org<ma...@storm.apache.org>>" <de...@storm.apache.org><mailto:dev@storm.apache.org
><ma...@storm.apache.org><mailto:dev@storm.apache.org%3E>>
Cc: Narendra Bidari <Na...@symantec.com><mailto:
Narendra_Bidari@symantec.com<ma...@symantec.com><mailto:Narendra_Bidari@symantec.com%3E>>
Subject: Re: Storm's Kafka spout is not reading latest data even with new
consumer group

Looks like it changed as a part of
https://issues.apache.org/jira/browse/STORM-563.  That might be a good
place to ask.
Specifically it was pull request https://github.com/apache/storm/pull/493.
To me it looks like the code was updated to use ignoreZKOffsets instead of
forceFromStart, but I have not dug into the exact details of the change to
know what all the ramifications might have been.
- Bobby

     On Thursday, May 26, 2016 10:13 PM, Sachin Pasalkar <
Sachin_Pasalkar@symantec.com<ma...@symantec.com><mailto<mailto:Sachin_Pasalkar@symantec.com%3E%3Cmailto>:
Sachin_Pasalkar@symantec.com<ma...@symantec.com>>> wrote:

Can you look at this please?

From: Sachin Pasalkar <sa...@symantec.com><mailto:
sachin_pasalkar@symantec.com<ma...@symantec.com><mailto<mailto:sachin_pasalkar@symantec.com%3E%3Cmailto>:
sachin_pasalkar@symantec.com<ma...@symantec.com>>>
Reply-To: "dev@storm.apache.org<ma...@storm.apache.org><mailto<mailto:dev@storm.apache.org%3E%3Cmailto>:
dev@storm.apache.org<ma...@storm.apache.org><mailto:dev@storm.apache.org%3E>" <de...@storm.apache.org>
<ma...@storm.apache.org><mailto<mailto:dev@storm.apache.org%3E%3Cmailto:dev@storm.apache.org%3E%3Cmailto>:
dev@storm.apache.org<ma...@storm.apache.org>>>
Date: Thursday, 26 May 2016 9:35 pm
To: "dev@storm.apache.org<ma...@storm.apache.org><mailto<mailto:dev@storm.apache.org%3E%3Cmailto>:
dev@storm.apache.org<ma...@storm.apache.org><mailto:dev@storm.apache.org%3E>" <de...@storm.apache.org>
<ma...@storm.apache.org><mailto<mailto:dev@storm.apache.org%3E%3Cmailto:dev@storm.apache.org%3E%3Cmailto>:
dev@storm.apache.org<ma...@storm.apache.org>>>
Cc: Narendra Bidari <Na...@symantec.com><mailto:
Narendra_Bidari@symantec.com<ma...@symantec.com><mailto<mailto:Narendra_Bidari@symantec.com%3E%3Cmailto>:
Narendra_Bidari@symantec.com<ma...@symantec.com>>>
Subject: Re: Storm's Kafka spout is not reading latest data even with new
consumer group

Can anyone look at this?

From: Sachin Pasalkar <sa...@symantec.com><mailto:
sachin_pasalkar@symantec.com<ma...@symantec.com><mailto<mailto:sachin_pasalkar@symantec.com%3E%3Cmailto>:
sachin_pasalkar@symantec.com<ma...@symantec.com><mailto:sachin_pasalkar@symantec.com%3E>>
Reply-To: "dev@storm.apache.org<ma...@storm.apache.org><mailto<mailto:dev@storm.apache.org%3E%3Cmailto>:
dev@storm.apache.org<ma...@storm.apache.org><mailto<mailto:dev@storm.apache.org%3E%3Cmailto>:
dev@storm.apache.org<ma...@storm.apache.org>>" <de...@storm.apache.org><mailto:dev@storm.apache.org
><ma...@storm.apache.org><mailto<mailto:dev@storm.apache.org%3E%3Cmailto:dev@storm.apache.org%3E%3Cmailto>:
dev@storm.apache.org<ma...@storm.apache.org>>>
Date: Thursday, 26 May 2016 1:18 pm
To: "dev@storm.apache.org<ma...@storm.apache.org><mailto<mailto:dev@storm.apache.org%3E%3Cmailto>:
dev@storm.apache.org<ma...@storm.apache.org><mailto<mailto:dev@storm.apache.org%3E%3Cmailto>:
dev@storm.apache.org<ma...@storm.apache.org>>" <de...@storm.apache.org><mailto:dev@storm.apache.org
><ma...@storm.apache.org><mailto<mailto:dev@storm.apache.org%3E%3Cmailto:dev@storm.apache.org%3E%3Cmailto>:
dev@storm.apache.org<ma...@storm.apache.org>>>
Cc: Narendra Bidari <Na...@symantec.com><mailto:
Narendra_Bidari@symantec.com<ma...@symantec.com><mailto<mailto:Narendra_Bidari@symantec.com%3E%3Cmailto>:
Narendra_Bidari@symantec.com<ma...@symantec.com><mailto:Narendra_Bidari@symantec.com%3E>>
Subject: Storm's Kafka spout is not reading latest data even with new
consumer group

Currently if you give the latest consumer group it starts reading data
from earliest offset rather than latest

In KafkaConfig

public long startOffsetTime = OffsetRequest.EarliestTime();


In doEmitNewPartitionBatch API of storm.kafka.trident.TridentKafkaEmitter,
if consumer group is null calls goes to 109 line

   if (lastMeta != null) {

/*  98 */      String lastInstanceId = null;

/*  99 */      Map lastTopoMeta = (Map)lastMeta.get("topology");

/* 100 */      if (lastTopoMeta != null)

/* 101 */        lastInstanceId = (String)lastTopoMeta.get("id");

/*    */      long offset;

/* 103 */      if ((_config.ignoreZkOffsets) &&
(!_topologyInstanceId.equals(lastInstanceId))) {

/* 104 */        offset = KafkaUtils.getOffset(consumer, _config.topic,
partition, _config.startOffsetTime);

/*    */      } else {

/* 106 */        offset = ((Long)lastMeta.get("nextOffset")).longValue();

/*    */      }

/*    */    } else {

/* 109 */      offset = KafkaUtils.getOffset(consumer, _config.topic,
partition, _config);

/*    */    }

Which calls below API. As you can see this call will fetch earliest data
rather than fetching latest

public static long getOffset(SimpleConsumer consumer, String topic, int
partition, KafkaConfig config)


{


     long startOffsetTime = config.startOffsetTime;


     return getOffset(consumer, topic, partition, startOffsetTime);








}



How it should be (It was there in previous release 0.9.x)

public static long getOffset(SimpleConsumer consumer, String topic, int
partition, KafkaConfig config) {


         long startOffsetTime = kafka.api.OffsetRequest.LatestTime();


         if ( config.ignoreZkOffsets) {


             startOffsetTime = config.startOffsetTime;


         }


         return getOffset(consumer, topic, partition, startOffsetTime);


     }



This code was earlier present but somehow it got removed. I tried to
search on github but didn't found history of change.

Thanks,

Sachin








--
Regards,
Abhishek Agarwal




--
Regards,
Abhishek Agarwal


Re: Storm's Kafka spout is not reading latest data even with new consumer group

Posted by Abhishek Agarwal <ab...@gmail.com>.
how about setting ignoreZkOffsets to false? config.startOffsetTime
indicates which offset to start from if there is no prior information about
the offsets. if ignoreZkOffsets is true, it is intended to read from what
is specified in configuration.

On Mon, May 30, 2016 at 9:35 AM, Sachin Pasalkar <
Sachin_Pasalkar@symantec.com> wrote:

> Not really. If you look at storm.kafka.trident.TridentKafkaEmitter code (doEmitNewPartitionBatch
> API). Setting time to latest will loose impact of ignoreZkOffsets. If
> ignoreZkOffsets is set to true it will still read from latest which user
> will not expect to happen.
>
> /* 103 */       if ((_config.ignoreZkOffsets) && (!_topologyInstanceId
> .equals(lastInstanceId))) {
>
> /* 104 */         offset = KafkaUtils.getOffset(consumer, _config.topic,
> partition, _config.startOffsetTime);
>
> /*     */       } else {
>
> /* 106 */         offset = ((Long)lastMeta.get("nextOffset")).longValue();
>
> /*     */       }
>
> /*     */     } else {
>
> /* 109 */       offset = KafkaUtils.getOffset(consumer, _config.topic,
> partition, _config);
>
> /*     */     }
>
> From: Abhishek Agarwal <ab...@gmail.com>
> Reply-To: "dev@storm.apache.org" <de...@storm.apache.org>
> Date: Sunday, 29 May 2016 8:32 pm
> To: "dev@storm.apache.org" <de...@storm.apache.org>
> Cc: Bobby Evans <ev...@yahoo-inc.com>, Narendra Bidari <
> Narendra_Bidari@symantec.com>
>
> Subject: Re: Storm's Kafka spout is not reading latest data even with new
> consumer group
>
> does setting kafkaConfig.startOffsetTime = OffsetRequest.LatestTime() not
> work?
>
> On Sun, May 29, 2016 at 7:56 PM, Sachin Pasalkar <
> Sachin_Pasalkar@symantec.com> wrote:
>
> I looked at discussion thread. It looks like user did this changes so new
> consumer will start reading data from earliest offset rather than latest.
> They haven’t consider below case as well if the there is changes in data &
> user forgot to clear old data from kafka topic it will cause mess (If user
> start with new consumer user will expect to read it from latest OR he can
> set offset explicitly) Setting to earliest is more error prone in PROD.
>
> Thoughts?
>
> From: Sachin Pasalkar <sachin_pasalkar@symantec.com<mailto:
> sachin_pasalkar@symantec.com>>
> Reply-To: "dev@storm.apache.org<ma...@storm.apache.org>
> <dev@storm.apache.org%3E>" <
> dev@storm.apache.org<ma...@storm.apache.org>
> <dev@storm.apache.org%3E>>
> Date: Saturday, 28 May 2016 5:12 pm
> To: "dev@storm.apache.org<ma...@storm.apache.org>
> <dev@storm.apache.org%3E>" <
> dev@storm.apache.org<ma...@storm.apache.org>
> <dev@storm.apache.org%3E>>, Bobby Evans <
> evans@yahoo-inc.com<ma...@yahoo-inc.com> <evans@yahoo-inc.com%3E>>
> Cc: Narendra Bidari <Narendra_Bidari@symantec.com<mailto:
> Narendra_Bidari@symantec.com>>
> Subject: Re: Storm's Kafka spout is not reading latest data even with new
> consumer group
>
> Thanks Bobby. I will ask on ticket.
>
> From: Bobby Evans <evans@yahoo-inc.com.INVALID<mailto:
> evans@yahoo-inc.com.INVALID><ma...@yahoo-inc.com.INVALID>
> <evans@yahoo-inc.com.INVALID%3E>>
> Reply-To: "dev@storm.apache.org<ma...@storm.apache.org><mailto
> <dev@storm.apache.org%3E%3Cmailto>:
> dev@storm.apache.org>" <dev@storm.apache.org<mailto:dev@storm.apache.org
> <de...@storm.apache.org>
> ><ma...@storm.apache.org> <dev@storm.apache.org%3E>>, Bobby Evans <
> evans@yahoo-inc.com<mailto:
> evans@yahoo-inc.com><ma...@yahoo-inc.com> <evans@yahoo-inc.com%3E>>
> Date: Friday, 27 May 2016 7:45 pm
> To: "dev@storm.apache.org<ma...@storm.apache.org><mailto
> <dev@storm.apache.org%3E%3Cmailto>:
> dev@storm.apache.org>" <dev@storm.apache.org<mailto:dev@storm.apache.org
> <de...@storm.apache.org>
> ><ma...@storm.apache.org> <dev@storm.apache.org%3E>>
> Cc: Narendra Bidari <Narendra_Bidari@symantec.com<mailto:
> Narendra_Bidari@symantec.com><ma...@symantec.com>
> <Narendra_Bidari@symantec.com%3E>>
> Subject: Re: Storm's Kafka spout is not reading latest data even with new
> consumer group
>
> Looks like it changed as a part of
> https://issues.apache.org/jira/browse/STORM-563.  That might be a good
> place to ask.
> Specifically it was pull request https://github.com/apache/storm/pull/493.
> To me it looks like the code was updated to use ignoreZKOffsets instead of
> forceFromStart, but I have not dug into the exact details of the change to
> know what all the ramifications might have been.
> - Bobby
>
>      On Thursday, May 26, 2016 10:13 PM, Sachin Pasalkar <
> Sachin_Pasalkar@symantec.com<ma...@symantec.com><mailto
> <Sachin_Pasalkar@symantec.com%3E%3Cmailto>:
> Sachin_Pasalkar@symantec.com>> wrote:
>
> Can you look at this please?
>
> From: Sachin Pasalkar <sachin_pasalkar@symantec.com<mailto:
> sachin_pasalkar@symantec.com><ma...@symantec.com><mailto
> <sachin_pasalkar@symantec.com%3E%3Cmailto>:
> sachin_pasalkar@symantec.com>>
> Reply-To: "dev@storm.apache.org<ma...@storm.apache.org><mailto
> <dev@storm.apache.org%3E%3Cmailto>:
> dev@storm.apache.org><ma...@storm.apache.org>
> <dev@storm.apache.org%3E>" <dev@storm.apache.org
> <ma...@storm.apache.org><mailto
> <dev@storm.apache.org%3E%3Cmailto:dev@storm.apache.org%3E%3Cmailto>:
> dev@storm.apache.org>>
> Date: Thursday, 26 May 2016 9:35 pm
> To: "dev@storm.apache.org<ma...@storm.apache.org><mailto
> <dev@storm.apache.org%3E%3Cmailto>:
> dev@storm.apache.org><ma...@storm.apache.org>
> <dev@storm.apache.org%3E>" <dev@storm.apache.org
> <ma...@storm.apache.org><mailto
> <dev@storm.apache.org%3E%3Cmailto:dev@storm.apache.org%3E%3Cmailto>:
> dev@storm.apache.org>>
> Cc: Narendra Bidari <Narendra_Bidari@symantec.com<mailto:
> Narendra_Bidari@symantec.com><ma...@symantec.com><mailto
> <Narendra_Bidari@symantec.com%3E%3Cmailto>:
> Narendra_Bidari@symantec.com>>
> Subject: Re: Storm's Kafka spout is not reading latest data even with new
> consumer group
>
> Can anyone look at this?
>
> From: Sachin Pasalkar <sachin_pasalkar@symantec.com<mailto:
> sachin_pasalkar@symantec.com><ma...@symantec.com><mailto
> <sachin_pasalkar@symantec.com%3E%3Cmailto>:
> sachin_pasalkar@symantec.com><ma...@symantec.com>
> <sachin_pasalkar@symantec.com%3E>>
> Reply-To: "dev@storm.apache.org<ma...@storm.apache.org><mailto
> <dev@storm.apache.org%3E%3Cmailto>:
> dev@storm.apache.org><ma...@storm.apache.org><mailto
> <dev@storm.apache.org%3E%3Cmailto>:
> dev@storm.apache.org>" <dev@storm.apache.org<mailto:dev@storm.apache.org
> <de...@storm.apache.org>
> ><ma...@storm.apache.org><mailto
> <dev@storm.apache.org%3E%3Cmailto:dev@storm.apache.org%3E%3Cmailto>:
> dev@storm.apache.org>>
> Date: Thursday, 26 May 2016 1:18 pm
> To: "dev@storm.apache.org<ma...@storm.apache.org><mailto
> <dev@storm.apache.org%3E%3Cmailto>:
> dev@storm.apache.org><ma...@storm.apache.org><mailto
> <dev@storm.apache.org%3E%3Cmailto>:
> dev@storm.apache.org>" <dev@storm.apache.org<mailto:dev@storm.apache.org
> <de...@storm.apache.org>
> ><ma...@storm.apache.org><mailto
> <dev@storm.apache.org%3E%3Cmailto:dev@storm.apache.org%3E%3Cmailto>:
> dev@storm.apache.org>>
> Cc: Narendra Bidari <Narendra_Bidari@symantec.com<mailto:
> Narendra_Bidari@symantec.com><ma...@symantec.com><mailto
> <Narendra_Bidari@symantec.com%3E%3Cmailto>:
> Narendra_Bidari@symantec.com><ma...@symantec.com>
> <Narendra_Bidari@symantec.com%3E>>
> Subject: Storm's Kafka spout is not reading latest data even with new
> consumer group
>
> Currently if you give the latest consumer group it starts reading data
> from earliest offset rather than latest
>
> In KafkaConfig
>
> public long startOffsetTime = OffsetRequest.EarliestTime();
>
>
> In doEmitNewPartitionBatch API of storm.kafka.trident.TridentKafkaEmitter,
> if consumer group is null calls goes to 109 line
>
>    if (lastMeta != null) {
>
> /*  98 */      String lastInstanceId = null;
>
> /*  99 */      Map lastTopoMeta = (Map)lastMeta.get("topology");
>
> /* 100 */      if (lastTopoMeta != null)
>
> /* 101 */        lastInstanceId = (String)lastTopoMeta.get("id");
>
> /*    */      long offset;
>
> /* 103 */      if ((_config.ignoreZkOffsets) &&
> (!_topologyInstanceId.equals(lastInstanceId))) {
>
> /* 104 */        offset = KafkaUtils.getOffset(consumer, _config.topic,
> partition, _config.startOffsetTime);
>
> /*    */      } else {
>
> /* 106 */        offset = ((Long)lastMeta.get("nextOffset")).longValue();
>
> /*    */      }
>
> /*    */    } else {
>
> /* 109 */      offset = KafkaUtils.getOffset(consumer, _config.topic,
> partition, _config);
>
> /*    */    }
>
> Which calls below API. As you can see this call will fetch earliest data
> rather than fetching latest
>
> public static long getOffset(SimpleConsumer consumer, String topic, int
> partition, KafkaConfig config)
>
>
> {
>
>
>      long startOffsetTime = config.startOffsetTime;
>
>
>      return getOffset(consumer, topic, partition, startOffsetTime);
>
>
>
>
>
>
>
>
> }
>
>
>
> How it should be (It was there in previous release 0.9.x)
>
> public static long getOffset(SimpleConsumer consumer, String topic, int
> partition, KafkaConfig config) {
>
>
>          long startOffsetTime = kafka.api.OffsetRequest.LatestTime();
>
>
>          if ( config.ignoreZkOffsets) {
>
>
>              startOffsetTime = config.startOffsetTime;
>
>
>          }
>
>
>          return getOffset(consumer, topic, partition, startOffsetTime);
>
>
>      }
>
>
>
> This code was earlier present but somehow it got removed. I tried to
> search on github but didn't found history of change.
>
> Thanks,
>
> Sachin
>
>
>
>
>
>
>
>
> --
> Regards,
> Abhishek Agarwal
>
>


-- 
Regards,
Abhishek Agarwal

Re: Storm's Kafka spout is not reading latest data even with new consumer group

Posted by Sachin Pasalkar <Sa...@symantec.com>.
Not really. If you look at storm.kafka.trident.TridentKafkaEmitter code (doEmitNewPartitionBatch API). Setting time to latest will loose impact of ignoreZkOffsets. If ignoreZkOffsets is set to true it will still read from latest which user will not expect to happen.


/* 103 */       if ((_config.ignoreZkOffsets) && (!_topologyInstanceId.equals(lastInstanceId))) {

/* 104 */         offset = KafkaUtils.getOffset(consumer, _config.topic, partition, _config.startOffsetTime);

/*     */       } else {

/* 106 */         offset = ((Long)lastMeta.get("nextOffset")).longValue();

/*     */       }

/*     */     } else {

/* 109 */       offset = KafkaUtils.getOffset(consumer, _config.topic, partition, _config);

/*     */     }

From: Abhishek Agarwal <ab...@gmail.com>>
Reply-To: "dev@storm.apache.org<ma...@storm.apache.org>" <de...@storm.apache.org>>
Date: Sunday, 29 May 2016 8:32 pm
To: "dev@storm.apache.org<ma...@storm.apache.org>" <de...@storm.apache.org>>
Cc: Bobby Evans <ev...@yahoo-inc.com>>, Narendra Bidari <Na...@symantec.com>>
Subject: Re: Storm's Kafka spout is not reading latest data even with new consumer group

does setting kafkaConfig.startOffsetTime = OffsetRequest.LatestTime() not
work?

On Sun, May 29, 2016 at 7:56 PM, Sachin Pasalkar <
Sachin_Pasalkar@symantec.com<ma...@symantec.com>> wrote:

I looked at discussion thread. It looks like user did this changes so new
consumer will start reading data from earliest offset rather than latest.
They haven’t consider below case as well if the there is changes in data &
user forgot to clear old data from kafka topic it will cause mess (If user
start with new consumer user will expect to read it from latest OR he can
set offset explicitly) Setting to earliest is more error prone in PROD.

Thoughts?

From: Sachin Pasalkar <sa...@symantec.com><mailto:
sachin_pasalkar@symantec.com<ma...@symantec.com>>>
Reply-To: "dev@storm.apache.org<ma...@storm.apache.org>" <
dev@storm.apache.org<ma...@storm.apache.org>>
Date: Saturday, 28 May 2016 5:12 pm
To: "dev@storm.apache.org<ma...@storm.apache.org>" <
dev@storm.apache.org<ma...@storm.apache.org>>, Bobby Evans <
evans@yahoo-inc.com<ma...@yahoo-inc.com>>
Cc: Narendra Bidari <Na...@symantec.com><mailto:
Narendra_Bidari@symantec.com<ma...@symantec.com>>>
Subject: Re: Storm's Kafka spout is not reading latest data even with new
consumer group

Thanks Bobby. I will ask on ticket.

From: Bobby Evans <ev...@yahoo-inc.com.INVALID><mailto:
evans@yahoo-inc.com.INVALID<ma...@yahoo-inc.com.INVALID>>
Reply-To: "dev@storm.apache.org<ma...@storm.apache.org><mailto:
dev@storm.apache.org<ma...@storm.apache.org>>" <de...@storm.apache.org><mailto:dev@storm.apache.org
><ma...@storm.apache.org>>, Bobby Evans <ev...@yahoo-inc.com><mailto:
evans@yahoo-inc.com<ma...@yahoo-inc.com>>
Date: Friday, 27 May 2016 7:45 pm
To: "dev@storm.apache.org<ma...@storm.apache.org><mailto:
dev@storm.apache.org<ma...@storm.apache.org>>" <de...@storm.apache.org><mailto:dev@storm.apache.org
><ma...@storm.apache.org>>
Cc: Narendra Bidari <Na...@symantec.com><mailto:
Narendra_Bidari@symantec.com<ma...@symantec.com>>
Subject: Re: Storm's Kafka spout is not reading latest data even with new
consumer group

Looks like it changed as a part of
https://issues.apache.org/jira/browse/STORM-563.  That might be a good
place to ask.
Specifically it was pull request https://github.com/apache/storm/pull/493.
To me it looks like the code was updated to use ignoreZKOffsets instead of
forceFromStart, but I have not dug into the exact details of the change to
know what all the ramifications might have been.
- Bobby

     On Thursday, May 26, 2016 10:13 PM, Sachin Pasalkar <
Sachin_Pasalkar@symantec.com<ma...@symantec.com><mailto:
Sachin_Pasalkar@symantec.com<ma...@symantec.com>>> wrote:

Can you look at this please?

From: Sachin Pasalkar <sa...@symantec.com><mailto:
sachin_pasalkar@symantec.com<ma...@symantec.com><mailto:
sachin_pasalkar@symantec.com<ma...@symantec.com>>>
Reply-To: "dev@storm.apache.org<ma...@storm.apache.org><mailto:
dev@storm.apache.org<ma...@storm.apache.org>" <de...@storm.apache.org>
<ma...@storm.apache.org><mailto:
dev@storm.apache.org<ma...@storm.apache.org>>>
Date: Thursday, 26 May 2016 9:35 pm
To: "dev@storm.apache.org<ma...@storm.apache.org><mailto:
dev@storm.apache.org<ma...@storm.apache.org>" <de...@storm.apache.org>
<ma...@storm.apache.org><mailto:
dev@storm.apache.org<ma...@storm.apache.org>>>
Cc: Narendra Bidari <Na...@symantec.com><mailto:
Narendra_Bidari@symantec.com<ma...@symantec.com><mailto:
Narendra_Bidari@symantec.com<ma...@symantec.com>>>
Subject: Re: Storm's Kafka spout is not reading latest data even with new
consumer group

Can anyone look at this?

From: Sachin Pasalkar <sa...@symantec.com><mailto:
sachin_pasalkar@symantec.com<ma...@symantec.com><mailto:
sachin_pasalkar@symantec.com<ma...@symantec.com>>
Reply-To: "dev@storm.apache.org<ma...@storm.apache.org><mailto:
dev@storm.apache.org<ma...@storm.apache.org><mailto:
dev@storm.apache.org<ma...@storm.apache.org>>" <de...@storm.apache.org><mailto:dev@storm.apache.org
><ma...@storm.apache.org><mailto:
dev@storm.apache.org<ma...@storm.apache.org>>>
Date: Thursday, 26 May 2016 1:18 pm
To: "dev@storm.apache.org<ma...@storm.apache.org><mailto:
dev@storm.apache.org<ma...@storm.apache.org><mailto:
dev@storm.apache.org<ma...@storm.apache.org>>" <de...@storm.apache.org><mailto:dev@storm.apache.org
><ma...@storm.apache.org><mailto:
dev@storm.apache.org<ma...@storm.apache.org>>>
Cc: Narendra Bidari <Na...@symantec.com><mailto:
Narendra_Bidari@symantec.com<ma...@symantec.com><mailto:
Narendra_Bidari@symantec.com<ma...@symantec.com>>
Subject: Storm's Kafka spout is not reading latest data even with new
consumer group

Currently if you give the latest consumer group it starts reading data
from earliest offset rather than latest

In KafkaConfig

public long startOffsetTime = OffsetRequest.EarliestTime();


In doEmitNewPartitionBatch API of storm.kafka.trident.TridentKafkaEmitter,
if consumer group is null calls goes to 109 line

   if (lastMeta != null) {

/*  98 */      String lastInstanceId = null;

/*  99 */      Map lastTopoMeta = (Map)lastMeta.get("topology");

/* 100 */      if (lastTopoMeta != null)

/* 101 */        lastInstanceId = (String)lastTopoMeta.get("id");

/*    */      long offset;

/* 103 */      if ((_config.ignoreZkOffsets) &&
(!_topologyInstanceId.equals(lastInstanceId))) {

/* 104 */        offset = KafkaUtils.getOffset(consumer, _config.topic,
partition, _config.startOffsetTime);

/*    */      } else {

/* 106 */        offset = ((Long)lastMeta.get("nextOffset")).longValue();

/*    */      }

/*    */    } else {

/* 109 */      offset = KafkaUtils.getOffset(consumer, _config.topic,
partition, _config);

/*    */    }

Which calls below API. As you can see this call will fetch earliest data
rather than fetching latest

public static long getOffset(SimpleConsumer consumer, String topic, int
partition, KafkaConfig config)


{


     long startOffsetTime = config.startOffsetTime;


     return getOffset(consumer, topic, partition, startOffsetTime);








}



How it should be (It was there in previous release 0.9.x)

public static long getOffset(SimpleConsumer consumer, String topic, int
partition, KafkaConfig config) {


         long startOffsetTime = kafka.api.OffsetRequest.LatestTime();


         if ( config.ignoreZkOffsets) {


             startOffsetTime = config.startOffsetTime;


         }


         return getOffset(consumer, topic, partition, startOffsetTime);


     }



This code was earlier present but somehow it got removed. I tried to
search on github but didn't found history of change.

Thanks,

Sachin








--
Regards,
Abhishek Agarwal


Re: Storm's Kafka spout is not reading latest data even with new consumer group

Posted by Abhishek Agarwal <ab...@gmail.com>.
does setting kafkaConfig.startOffsetTime = OffsetRequest.LatestTime() not
work?

On Sun, May 29, 2016 at 7:56 PM, Sachin Pasalkar <
Sachin_Pasalkar@symantec.com> wrote:

> I looked at discussion thread. It looks like user did this changes so new
> consumer will start reading data from earliest offset rather than latest.
> They haven’t consider below case as well if the there is changes in data &
> user forgot to clear old data from kafka topic it will cause mess (If user
> start with new consumer user will expect to read it from latest OR he can
> set offset explicitly) Setting to earliest is more error prone in PROD.
>
> Thoughts?
>
> From: Sachin Pasalkar <sachin_pasalkar@symantec.com<mailto:
> sachin_pasalkar@symantec.com>>
> Reply-To: "dev@storm.apache.org<ma...@storm.apache.org>" <
> dev@storm.apache.org<ma...@storm.apache.org>>
> Date: Saturday, 28 May 2016 5:12 pm
> To: "dev@storm.apache.org<ma...@storm.apache.org>" <
> dev@storm.apache.org<ma...@storm.apache.org>>, Bobby Evans <
> evans@yahoo-inc.com<ma...@yahoo-inc.com>>
> Cc: Narendra Bidari <Narendra_Bidari@symantec.com<mailto:
> Narendra_Bidari@symantec.com>>
> Subject: Re: Storm's Kafka spout is not reading latest data even with new
> consumer group
>
> Thanks Bobby. I will ask on ticket.
>
> From: Bobby Evans <evans@yahoo-inc.com.INVALID<mailto:
> evans@yahoo-inc.com.INVALID><ma...@yahoo-inc.com.INVALID>>
> Reply-To: "dev@storm.apache.org<ma...@storm.apache.org><mailto:
> dev@storm.apache.org>" <dev@storm.apache.org<mailto:dev@storm.apache.org
> ><ma...@storm.apache.org>>, Bobby Evans <evans@yahoo-inc.com<mailto:
> evans@yahoo-inc.com><ma...@yahoo-inc.com>>
> Date: Friday, 27 May 2016 7:45 pm
> To: "dev@storm.apache.org<ma...@storm.apache.org><mailto:
> dev@storm.apache.org>" <dev@storm.apache.org<mailto:dev@storm.apache.org
> ><ma...@storm.apache.org>>
> Cc: Narendra Bidari <Narendra_Bidari@symantec.com<mailto:
> Narendra_Bidari@symantec.com><ma...@symantec.com>>
> Subject: Re: Storm's Kafka spout is not reading latest data even with new
> consumer group
>
> Looks like it changed as a part of
> https://issues.apache.org/jira/browse/STORM-563.  That might be a good
> place to ask.
> Specifically it was pull request https://github.com/apache/storm/pull/493.
> To me it looks like the code was updated to use ignoreZKOffsets instead of
> forceFromStart, but I have not dug into the exact details of the change to
> know what all the ramifications might have been.
> - Bobby
>
>     On Thursday, May 26, 2016 10:13 PM, Sachin Pasalkar <
> Sachin_Pasalkar@symantec.com<ma...@symantec.com><mailto:
> Sachin_Pasalkar@symantec.com>> wrote:
>
> Can you look at this please?
>
> From: Sachin Pasalkar <sachin_pasalkar@symantec.com<mailto:
> sachin_pasalkar@symantec.com><ma...@symantec.com><mailto:
> sachin_pasalkar@symantec.com>>
> Reply-To: "dev@storm.apache.org<ma...@storm.apache.org><mailto:
> dev@storm.apache.org><ma...@storm.apache.org>" <dev@storm.apache.org
> <ma...@storm.apache.org><mailto:
> dev@storm.apache.org>>
> Date: Thursday, 26 May 2016 9:35 pm
> To: "dev@storm.apache.org<ma...@storm.apache.org><mailto:
> dev@storm.apache.org><ma...@storm.apache.org>" <dev@storm.apache.org
> <ma...@storm.apache.org><mailto:
> dev@storm.apache.org>>
> Cc: Narendra Bidari <Narendra_Bidari@symantec.com<mailto:
> Narendra_Bidari@symantec.com><ma...@symantec.com><mailto:
> Narendra_Bidari@symantec.com>>
> Subject: Re: Storm's Kafka spout is not reading latest data even with new
> consumer group
>
> Can anyone look at this?
>
> From: Sachin Pasalkar <sachin_pasalkar@symantec.com<mailto:
> sachin_pasalkar@symantec.com><ma...@symantec.com><mailto:
> sachin_pasalkar@symantec.com><ma...@symantec.com>>
> Reply-To: "dev@storm.apache.org<ma...@storm.apache.org><mailto:
> dev@storm.apache.org><ma...@storm.apache.org><mailto:
> dev@storm.apache.org>" <dev@storm.apache.org<mailto:dev@storm.apache.org
> ><ma...@storm.apache.org><mailto:
> dev@storm.apache.org>>
> Date: Thursday, 26 May 2016 1:18 pm
> To: "dev@storm.apache.org<ma...@storm.apache.org><mailto:
> dev@storm.apache.org><ma...@storm.apache.org><mailto:
> dev@storm.apache.org>" <dev@storm.apache.org<mailto:dev@storm.apache.org
> ><ma...@storm.apache.org><mailto:
> dev@storm.apache.org>>
> Cc: Narendra Bidari <Narendra_Bidari@symantec.com<mailto:
> Narendra_Bidari@symantec.com><ma...@symantec.com><mailto:
> Narendra_Bidari@symantec.com><ma...@symantec.com>>
> Subject: Storm's Kafka spout is not reading latest data even with new
> consumer group
>
> Currently if you give the latest consumer group it starts reading data
> from earliest offset rather than latest
>
> In KafkaConfig
>
> public long startOffsetTime = OffsetRequest.EarliestTime();
>
>
> In doEmitNewPartitionBatch API of storm.kafka.trident.TridentKafkaEmitter,
> if consumer group is null calls goes to 109 line
>
>   if (lastMeta != null) {
>
> /*  98 */      String lastInstanceId = null;
>
> /*  99 */      Map lastTopoMeta = (Map)lastMeta.get("topology");
>
> /* 100 */      if (lastTopoMeta != null)
>
> /* 101 */        lastInstanceId = (String)lastTopoMeta.get("id");
>
> /*    */      long offset;
>
> /* 103 */      if ((_config.ignoreZkOffsets) &&
> (!_topologyInstanceId.equals(lastInstanceId))) {
>
> /* 104 */        offset = KafkaUtils.getOffset(consumer, _config.topic,
> partition, _config.startOffsetTime);
>
> /*    */      } else {
>
> /* 106 */        offset = ((Long)lastMeta.get("nextOffset")).longValue();
>
> /*    */      }
>
> /*    */    } else {
>
> /* 109 */      offset = KafkaUtils.getOffset(consumer, _config.topic,
> partition, _config);
>
> /*    */    }
>
> Which calls below API. As you can see this call will fetch earliest data
> rather than fetching latest
>
> public static long getOffset(SimpleConsumer consumer, String topic, int
> partition, KafkaConfig config)
>
>
> {
>
>
>     long startOffsetTime = config.startOffsetTime;
>
>
>     return getOffset(consumer, topic, partition, startOffsetTime);
>
>
>
>
>
>
>
>
> }
>
>
>
> How it should be (It was there in previous release 0.9.x)
>
> public static long getOffset(SimpleConsumer consumer, String topic, int
> partition, KafkaConfig config) {
>
>
>         long startOffsetTime = kafka.api.OffsetRequest.LatestTime();
>
>
>         if ( config.ignoreZkOffsets) {
>
>
>             startOffsetTime = config.startOffsetTime;
>
>
>         }
>
>
>         return getOffset(consumer, topic, partition, startOffsetTime);
>
>
>     }
>
>
>
> This code was earlier present but somehow it got removed. I tried to
> search on github but didn't found history of change.
>
> Thanks,
>
> Sachin
>
>
>
>
>
>


-- 
Regards,
Abhishek Agarwal

Re: Storm's Kafka spout is not reading latest data even with new consumer group

Posted by Sachin Pasalkar <Sa...@symantec.com>.
I looked at discussion thread. It looks like user did this changes so new consumer will start reading data from earliest offset rather than latest. They haven’t consider below case as well if the there is changes in data & user forgot to clear old data from kafka topic it will cause mess (If user start with new consumer user will expect to read it from latest OR he can set offset explicitly) Setting to earliest is more error prone in PROD.

Thoughts?

From: Sachin Pasalkar <sa...@symantec.com>>
Reply-To: "dev@storm.apache.org<ma...@storm.apache.org>" <de...@storm.apache.org>>
Date: Saturday, 28 May 2016 5:12 pm
To: "dev@storm.apache.org<ma...@storm.apache.org>" <de...@storm.apache.org>>, Bobby Evans <ev...@yahoo-inc.com>>
Cc: Narendra Bidari <Na...@symantec.com>>
Subject: Re: Storm's Kafka spout is not reading latest data even with new consumer group

Thanks Bobby. I will ask on ticket.

From: Bobby Evans <ev...@yahoo-inc.com.INVALID>>
Reply-To: "dev@storm.apache.org<ma...@storm.apache.org>" <de...@storm.apache.org>>, Bobby Evans <ev...@yahoo-inc.com>>
Date: Friday, 27 May 2016 7:45 pm
To: "dev@storm.apache.org<ma...@storm.apache.org>" <de...@storm.apache.org>>
Cc: Narendra Bidari <Na...@symantec.com>>
Subject: Re: Storm's Kafka spout is not reading latest data even with new consumer group

Looks like it changed as a part of https://issues.apache.org/jira/browse/STORM-563.  That might be a good place to ask.
Specifically it was pull request https://github.com/apache/storm/pull/493.
To me it looks like the code was updated to use ignoreZKOffsets instead of forceFromStart, but I have not dug into the exact details of the change to know what all the ramifications might have been.
- Bobby

    On Thursday, May 26, 2016 10:13 PM, Sachin Pasalkar <Sa...@symantec.com>> wrote:

Can you look at this please?

From: Sachin Pasalkar <sa...@symantec.com>>
Reply-To: "dev@storm.apache.org<ma...@storm.apache.org>" <de...@storm.apache.org>>
Date: Thursday, 26 May 2016 9:35 pm
To: "dev@storm.apache.org<ma...@storm.apache.org>" <de...@storm.apache.org>>
Cc: Narendra Bidari <Na...@symantec.com>>
Subject: Re: Storm's Kafka spout is not reading latest data even with new consumer group

Can anyone look at this?

From: Sachin Pasalkar <sa...@symantec.com>>
Reply-To: "dev@storm.apache.org<ma...@storm.apache.org>" <de...@storm.apache.org>>
Date: Thursday, 26 May 2016 1:18 pm
To: "dev@storm.apache.org<ma...@storm.apache.org>" <de...@storm.apache.org>>
Cc: Narendra Bidari <Na...@symantec.com>>
Subject: Storm's Kafka spout is not reading latest data even with new consumer group

Currently if you give the latest consumer group it starts reading data from earliest offset rather than latest

In KafkaConfig

public long startOffsetTime = OffsetRequest.EarliestTime();


In doEmitNewPartitionBatch API of storm.kafka.trident.TridentKafkaEmitter, if consumer group is null calls goes to 109 line

  if (lastMeta != null) {

/*  98 */      String lastInstanceId = null;

/*  99 */      Map lastTopoMeta = (Map)lastMeta.get("topology");

/* 100 */      if (lastTopoMeta != null)

/* 101 */        lastInstanceId = (String)lastTopoMeta.get("id");

/*    */      long offset;

/* 103 */      if ((_config.ignoreZkOffsets) && (!_topologyInstanceId.equals(lastInstanceId))) {

/* 104 */        offset = KafkaUtils.getOffset(consumer, _config.topic, partition, _config.startOffsetTime);

/*    */      } else {

/* 106 */        offset = ((Long)lastMeta.get("nextOffset")).longValue();

/*    */      }

/*    */    } else {

/* 109 */      offset = KafkaUtils.getOffset(consumer, _config.topic, partition, _config);

/*    */    }

Which calls below API. As you can see this call will fetch earliest data rather than fetching latest

public static long getOffset(SimpleConsumer consumer, String topic, int partition, KafkaConfig config)


{


    long startOffsetTime = config.startOffsetTime;


    return getOffset(consumer, topic, partition, startOffsetTime);








}



How it should be (It was there in previous release 0.9.x)

public static long getOffset(SimpleConsumer consumer, String topic, int partition, KafkaConfig config) {


        long startOffsetTime = kafka.api.OffsetRequest.LatestTime();


        if ( config.ignoreZkOffsets) {


            startOffsetTime = config.startOffsetTime;


        }


        return getOffset(consumer, topic, partition, startOffsetTime);


    }



This code was earlier present but somehow it got removed. I tried to search on github but didn't found history of change.

Thanks,

Sachin






Re: Storm's Kafka spout is not reading latest data even with new consumer group

Posted by Sachin Pasalkar <Sa...@symantec.com>.
Thanks Bobby. I will ask on ticket.

From: Bobby Evans <ev...@yahoo-inc.com.INVALID>>
Reply-To: "dev@storm.apache.org<ma...@storm.apache.org>" <de...@storm.apache.org>>, Bobby Evans <ev...@yahoo-inc.com>>
Date: Friday, 27 May 2016 7:45 pm
To: "dev@storm.apache.org<ma...@storm.apache.org>" <de...@storm.apache.org>>
Cc: Narendra Bidari <Na...@symantec.com>>
Subject: Re: Storm's Kafka spout is not reading latest data even with new consumer group

Looks like it changed as a part of https://issues.apache.org/jira/browse/STORM-563.  That might be a good place to ask.
Specifically it was pull request https://github.com/apache/storm/pull/493.
To me it looks like the code was updated to use ignoreZKOffsets instead of forceFromStart, but I have not dug into the exact details of the change to know what all the ramifications might have been.
 - Bobby

    On Thursday, May 26, 2016 10:13 PM, Sachin Pasalkar <Sa...@symantec.com>> wrote:

Can you look at this please?

From: Sachin Pasalkar <sa...@symantec.com>>
Reply-To: "dev@storm.apache.org<ma...@storm.apache.org>" <de...@storm.apache.org>>
Date: Thursday, 26 May 2016 9:35 pm
To: "dev@storm.apache.org<ma...@storm.apache.org>" <de...@storm.apache.org>>
Cc: Narendra Bidari <Na...@symantec.com>>
Subject: Re: Storm's Kafka spout is not reading latest data even with new consumer group

Can anyone look at this?

From: Sachin Pasalkar <sa...@symantec.com>>
Reply-To: "dev@storm.apache.org<ma...@storm.apache.org>" <de...@storm.apache.org>>
Date: Thursday, 26 May 2016 1:18 pm
To: "dev@storm.apache.org<ma...@storm.apache.org>" <de...@storm.apache.org>>
Cc: Narendra Bidari <Na...@symantec.com>>
Subject: Storm's Kafka spout is not reading latest data even with new consumer group

Currently if you give the latest consumer group it starts reading data from earliest offset rather than latest

In KafkaConfig

public long startOffsetTime = OffsetRequest.EarliestTime();


In doEmitNewPartitionBatch API of storm.kafka.trident.TridentKafkaEmitter, if consumer group is null calls goes to 109 line

  if (lastMeta != null) {

/*  98 */      String lastInstanceId = null;

/*  99 */      Map lastTopoMeta = (Map)lastMeta.get("topology");

/* 100 */      if (lastTopoMeta != null)

/* 101 */        lastInstanceId = (String)lastTopoMeta.get("id");

/*    */      long offset;

/* 103 */      if ((_config.ignoreZkOffsets) && (!_topologyInstanceId.equals(lastInstanceId))) {

/* 104 */        offset = KafkaUtils.getOffset(consumer, _config.topic, partition, _config.startOffsetTime);

/*    */      } else {

/* 106 */        offset = ((Long)lastMeta.get("nextOffset")).longValue();

/*    */      }

/*    */    } else {

/* 109 */      offset = KafkaUtils.getOffset(consumer, _config.topic, partition, _config);

/*    */    }

Which calls below API. As you can see this call will fetch earliest data rather than fetching latest

public static long getOffset(SimpleConsumer consumer, String topic, int partition, KafkaConfig config)


{


    long startOffsetTime = config.startOffsetTime;


    return getOffset(consumer, topic, partition, startOffsetTime);








}



How it should be (It was there in previous release 0.9.x)

public static long getOffset(SimpleConsumer consumer, String topic, int partition, KafkaConfig config) {


        long startOffsetTime = kafka.api.OffsetRequest.LatestTime();


        if ( config.ignoreZkOffsets) {


            startOffsetTime = config.startOffsetTime;


        }


        return getOffset(consumer, topic, partition, startOffsetTime);


    }



This code was earlier present but somehow it got removed. I tried to search on github but didn't found history of change.

Thanks,

Sachin





Re: Storm's Kafka spout is not reading latest data even with new consumer group

Posted by Bobby Evans <ev...@yahoo-inc.com.INVALID>.
Looks like it changed as a part of https://issues.apache.org/jira/browse/STORM-563.  That might be a good place to ask.
Specifically it was pull request https://github.com/apache/storm/pull/493.
To me it looks like the code was updated to use ignoreZKOffsets instead of forceFromStart, but I have not dug into the exact details of the change to know what all the ramifications might have been.
 - Bobby 

    On Thursday, May 26, 2016 10:13 PM, Sachin Pasalkar <Sa...@symantec.com> wrote:
 

 Can you look at this please?

From: Sachin Pasalkar <sa...@symantec.com>>
Reply-To: "dev@storm.apache.org<ma...@storm.apache.org>" <de...@storm.apache.org>>
Date: Thursday, 26 May 2016 9:35 pm
To: "dev@storm.apache.org<ma...@storm.apache.org>" <de...@storm.apache.org>>
Cc: Narendra Bidari <Na...@symantec.com>>
Subject: Re: Storm's Kafka spout is not reading latest data even with new consumer group

Can anyone look at this?

From: Sachin Pasalkar <sa...@symantec.com>>
Reply-To: "dev@storm.apache.org<ma...@storm.apache.org>" <de...@storm.apache.org>>
Date: Thursday, 26 May 2016 1:18 pm
To: "dev@storm.apache.org<ma...@storm.apache.org>" <de...@storm.apache.org>>
Cc: Narendra Bidari <Na...@symantec.com>>
Subject: Storm's Kafka spout is not reading latest data even with new consumer group

Currently if you give the latest consumer group it starts reading data from earliest offset rather than latest

In KafkaConfig

public long startOffsetTime = OffsetRequest.EarliestTime();


In doEmitNewPartitionBatch API of storm.kafka.trident.TridentKafkaEmitter, if consumer group is null calls goes to 109 line

  if (lastMeta != null) {

/*  98 */      String lastInstanceId = null;

/*  99 */      Map lastTopoMeta = (Map)lastMeta.get("topology");

/* 100 */      if (lastTopoMeta != null)

/* 101 */        lastInstanceId = (String)lastTopoMeta.get("id");

/*    */      long offset;

/* 103 */      if ((_config.ignoreZkOffsets) && (!_topologyInstanceId.equals(lastInstanceId))) {

/* 104 */        offset = KafkaUtils.getOffset(consumer, _config.topic, partition, _config.startOffsetTime);

/*    */      } else {

/* 106 */        offset = ((Long)lastMeta.get("nextOffset")).longValue();

/*    */      }

/*    */    } else {

/* 109 */      offset = KafkaUtils.getOffset(consumer, _config.topic, partition, _config);

/*    */    }

Which calls below API. As you can see this call will fetch earliest data rather than fetching latest

public static long getOffset(SimpleConsumer consumer, String topic, int partition, KafkaConfig config)


{


    long startOffsetTime = config.startOffsetTime;


    return getOffset(consumer, topic, partition, startOffsetTime);








}



How it should be (It was there in previous release 0.9.x)

public static long getOffset(SimpleConsumer consumer, String topic, int partition, KafkaConfig config) {


        long startOffsetTime = kafka.api.OffsetRequest.LatestTime();


        if ( config.ignoreZkOffsets) {


            startOffsetTime = config.startOffsetTime;


        }


        return getOffset(consumer, topic, partition, startOffsetTime);


    }



This code was earlier present but somehow it got removed. I tried to search on github but didn't found history of change.

Thanks,

Sachin



  

Re: Storm's Kafka spout is not reading latest data even with new consumer group

Posted by Sachin Pasalkar <Sa...@symantec.com>.
Can you look at this please?

From: Sachin Pasalkar <sa...@symantec.com>>
Reply-To: "dev@storm.apache.org<ma...@storm.apache.org>" <de...@storm.apache.org>>
Date: Thursday, 26 May 2016 9:35 pm
To: "dev@storm.apache.org<ma...@storm.apache.org>" <de...@storm.apache.org>>
Cc: Narendra Bidari <Na...@symantec.com>>
Subject: Re: Storm's Kafka spout is not reading latest data even with new consumer group

Can anyone look at this?

From: Sachin Pasalkar <sa...@symantec.com>>
Reply-To: "dev@storm.apache.org<ma...@storm.apache.org>" <de...@storm.apache.org>>
Date: Thursday, 26 May 2016 1:18 pm
To: "dev@storm.apache.org<ma...@storm.apache.org>" <de...@storm.apache.org>>
Cc: Narendra Bidari <Na...@symantec.com>>
Subject: Storm's Kafka spout is not reading latest data even with new consumer group

Currently if you give the latest consumer group it starts reading data from earliest offset rather than latest

In KafkaConfig

public long startOffsetTime = OffsetRequest.EarliestTime();


In doEmitNewPartitionBatch API of storm.kafka.trident.TridentKafkaEmitter, if consumer group is null calls goes to 109 line

  if (lastMeta != null) {

/*  98 */       String lastInstanceId = null;

/*  99 */       Map lastTopoMeta = (Map)lastMeta.get("topology");

/* 100 */       if (lastTopoMeta != null)

/* 101 */         lastInstanceId = (String)lastTopoMeta.get("id");

/*     */       long offset;

/* 103 */       if ((_config.ignoreZkOffsets) && (!_topologyInstanceId.equals(lastInstanceId))) {

/* 104 */         offset = KafkaUtils.getOffset(consumer, _config.topic, partition, _config.startOffsetTime);

/*     */       } else {

/* 106 */         offset = ((Long)lastMeta.get("nextOffset")).longValue();

/*     */       }

/*     */     } else {

/* 109 */       offset = KafkaUtils.getOffset(consumer, _config.topic, partition, _config);

/*     */     }

Which calls below API. As you can see this call will fetch earliest data rather than fetching latest

public static long getOffset(SimpleConsumer consumer, String topic, int partition, KafkaConfig config)


{


    long startOffsetTime = config.startOffsetTime;


    return getOffset(consumer, topic, partition, startOffsetTime);








}



How it should be (It was there in previous release 0.9.x)

public static long getOffset(SimpleConsumer consumer, String topic, int partition, KafkaConfig config) {


        long startOffsetTime = kafka.api.OffsetRequest.LatestTime();


        if ( config.ignoreZkOffsets) {


            startOffsetTime = config.startOffsetTime;


        }


        return getOffset(consumer, topic, partition, startOffsetTime);


    }



This code was earlier present but somehow it got removed. I tried to search on github but didn't found history of change.

Thanks,

Sachin



Re: Storm's Kafka spout is not reading latest data even with new consumer group

Posted by Sachin Pasalkar <Sa...@symantec.com>.
Can anyone look at this?

From: Sachin Pasalkar <sa...@symantec.com>>
Reply-To: "dev@storm.apache.org<ma...@storm.apache.org>" <de...@storm.apache.org>>
Date: Thursday, 26 May 2016 1:18 pm
To: "dev@storm.apache.org<ma...@storm.apache.org>" <de...@storm.apache.org>>
Cc: Narendra Bidari <Na...@symantec.com>>
Subject: Storm's Kafka spout is not reading latest data even with new consumer group

Currently if you give the latest consumer group it starts reading data from earliest offset rather than latest

In KafkaConfig

public long startOffsetTime = OffsetRequest.EarliestTime();


In doEmitNewPartitionBatch API of storm.kafka.trident.TridentKafkaEmitter, if consumer group is null calls goes to 109 line

  if (lastMeta != null) {

/*  98 */       String lastInstanceId = null;

/*  99 */       Map lastTopoMeta = (Map)lastMeta.get("topology");

/* 100 */       if (lastTopoMeta != null)

/* 101 */         lastInstanceId = (String)lastTopoMeta.get("id");

/*     */       long offset;

/* 103 */       if ((_config.ignoreZkOffsets) && (!_topologyInstanceId.equals(lastInstanceId))) {

/* 104 */         offset = KafkaUtils.getOffset(consumer, _config.topic, partition, _config.startOffsetTime);

/*     */       } else {

/* 106 */         offset = ((Long)lastMeta.get("nextOffset")).longValue();

/*     */       }

/*     */     } else {

/* 109 */       offset = KafkaUtils.getOffset(consumer, _config.topic, partition, _config);

/*     */     }

Which calls below API. As you can see this call will fetch earliest data rather than fetching latest

public static long getOffset(SimpleConsumer consumer, String topic, int partition, KafkaConfig config)


{


    long startOffsetTime = config.startOffsetTime;


    return getOffset(consumer, topic, partition, startOffsetTime);








}



How it should be (It was there in previous release 0.9.x)

public static long getOffset(SimpleConsumer consumer, String topic, int partition, KafkaConfig config) {


        long startOffsetTime = kafka.api.OffsetRequest.LatestTime();


        if ( config.ignoreZkOffsets) {


            startOffsetTime = config.startOffsetTime;


        }


        return getOffset(consumer, topic, partition, startOffsetTime);


    }



This code was earlier present but somehow it got removed. I tried to search on github but didn't found history of change.

Thanks,

Sachin