You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by "Sachin Pasalkar (JIRA)" <ji...@apache.org> on 2016/05/28 12:10:12 UTC

[jira] [Commented] (STORM-563) Kafka Spout doesn't pick up from the beginning of the queue unless forceFromStart specified

    [ https://issues.apache.org/jira/browse/STORM-563?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15305326#comment-15305326 ] 

Sachin Pasalkar commented on STORM-563:
---------------------------------------

[~sriharsha] Due to this changes some off our changes are breaking. We had a following case
1) Our topologies were running for long time suddenly we started getting issue of not enough data to calculates spout lag. 
2) As remediation of this we had to changes consumer group. Changing consumer group caused processing duplicates in Trident.

In KafkaConfig
{code:java}
public long startOffsetTime = OffsetRequest.EarliestTime();
{code}
In doEmitNewPartitionBatch API of storm.kafka.trident.TridentKafkaEmitter, if consumer group is new calls goes to 109 line
{code:java}
  if (lastMeta != null) {
/*  98 */       String lastInstanceId = null;
/*  99 */       Map lastTopoMeta = (Map)lastMeta.get("topology");
/* 100 */       if (lastTopoMeta != null)
/* 101 */         lastInstanceId = (String)lastTopoMeta.get("id");
/*     */       long offset;
/* 103 */       if ((_config.ignoreZkOffsets) && (!_topologyInstanceId.equals(lastInstanceId))) {
/* 104 */         offset = KafkaUtils.getOffset(consumer, _config.topic, partition, _config.startOffsetTime);
/*     */       } else {
/* 106 */         offset = ((Long)lastMeta.get("nextOffset")).longValue();
/*     */       }
/*     */     } else {
/* 109 */      offset = KafkaUtils.getOffset(consumer, _config.topic, partition, _config);
/*     */     }
{code}
Which calls below API. As you can see this call will fetch earliest data rather than fetching latest
{code:java}
public static long getOffset(SimpleConsumer consumer, String topic, int partition, KafkaConfig config)
{
  long startOffsetTime = config.startOffsetTime;
    return getOffset(consumer, topic, partition, startOffsetTime);
}
{code}
How it should be (It was there in previous release 0.9.x)
{code:java}
public static long getOffset(SimpleConsumer consumer, String topic, int partition, KafkaConfig config) {
        long startOffsetTime = kafka.api.OffsetRequest.LatestTime();

        if ( config.ignoreZkOffsets) {
            startOffsetTime = config.startOffsetTime;
        }
        return getOffset(consumer, topic, partition, startOffsetTime);
    }
{code}

Why do you think Spout should pick up from beginning? It should pick up only when specified. This changes will also allow user if he wants to ignore zkoffset & read data from particular time. 

 

> Kafka Spout doesn't pick up from the beginning of the queue unless forceFromStart specified
> -------------------------------------------------------------------------------------------
>
>                 Key: STORM-563
>                 URL: https://issues.apache.org/jira/browse/STORM-563
>             Project: Apache Storm
>          Issue Type: Bug
>          Components: storm-kafka
>            Reporter: Sriharsha Chintalapani
>            Assignee: Sriharsha Chintalapani
>             Fix For: 0.10.0
>
>
> KafkaUtil.getOffset starts from LatestTime unless forceFromStart specified. It should pick this from KafkaConfig.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)