You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by Sachin Pasalkar <Sa...@symantec.com> on 2017/01/18 16:39:00 UTC

Re: [DISCUSS] New Kafka spout doesn't support seek to given offset

[Updated Subject]

Incase of 2nd case, as user has changed group.id there will be no history for this id. So code will automatically fall down to the EARLIEST or LATEST.

I was able to code for it somewhat where I am able to fetch data from certain offset (with kind of hack). What I have seen is when I provide offset it pull up the proper records. However, org.apache.storm.kafka.spout.trident.KafkaTridentSpoutManager.subscribeKafkaConsumer() call poll initially which caused off set to get updated to latest. I guess we need to have work on that on Kafka site to update partition offset incase user has provided the offset. If that works then we have minimal code to pass on the offset.

Below are cases I have tried

  *   Don't provide offset: It will behave normal.
  *   Have less offset requested: It works with caveat it reads 2 time once with offset came from subscribeKafkaConsumer call but drops the data for request as requested & actual position differs. Then it takes my provided value & works fine further )
  *   Have bigger number than current offset: This is case where I got blocked because we are not setting expected offset initially in call of subscribeKafkaConsumer.  As my code keeps updated the offset to user provided. I can put hack but not sure how it will behave in all cases.

Below is code I inserted in
org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.seek(TopicPartition, KafkaTridentSpoutBatchMetadata<K, V>) where startOffset is offset value provided by user.

if(startOffset!=null && lastBatchMeta==null){

kafkaConsumer.seek(tp, startOffset + 1);  // seek offset provided by user

LOG.debug("Seeking fetch offset provided by user");

}

else if (lastBatchMeta != null) {

kafkaConsumer.seek(tp, lastBatchMeta.getLastOffset() + 1);  // seek next offset after last offset from previous batch

LOG.debug("Seeking fetch offset to next offset after last offset from previous batch");


}...

On 18/01/17, 9:30 PM, "Hugo Da Cruz Louro" <hl...@hortonworks.com>> wrote:

Hi Sachin,

The 2nd case can likely handled with the committed offset, which is covered by UNCOMMITTED_EARLIEST or UNCOMMITTED_LATEST.

The 1st case it may make sense but even if you give the start offset, since Kafka polls a certain number of bytes, and not specifically a number of records, it may not be trivial to guarantee that the same exact dataset is polled each time.

However, If we as a community agree that it is useful to support your proposed feature, I have no particular argument to do so.

Best,
Hugo
PS. We usually have the practice to initiate discussion threads with email subject prefixed with [DISCUSS]



On Jan 18, 2017, at 6:33 AM, Sachin Pasalkar <Sa...@symantec.com>> wrote:
Hi,
I was looking at code of current KafkaTridentSpoutEmitter & KafkaSpout class. Can we add functionality based on user provided offset to start from particular offset? This would be useful incase user wants to reprocess particular data set. Another example user has changed the group id & aware where old offset committed & he wants to start processing from same position.
Does this make sense? OR its explicit that it will not be supported?
Thanks,
Sachin