You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by "Peter Chamberlain (JIRA)" <ji...@apache.org> on 2015/01/16 11:39:34 UTC

[jira] [Commented] (STORM-563) Kafka Spout doesn't pick up from the beginning of the queue unless forceFromStart specified

    [ https://issues.apache.org/jira/browse/STORM-563?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14280074#comment-14280074 ] 

Peter Chamberlain commented on STORM-563:
-----------------------------------------

Is there any progress on this new patch?
It seems to me that the method should logically just do the following, and anything else is just plain confusing.

    public static long getOffset(SimpleConsumer consumer, String topic, int partition, KafkaConfig config) {
        long startOffsetTime = config.startOffsetTime;
        return getOffset(consumer, topic, partition, startOffsetTime);
    }

Also locally altered PartitionManager, as ignoring failure to read position from zookeeper is very dangerous if it makes everything get reprocessed everytime:

        try {
            Map<Object, Object> json = _state.readJSON(path);
            LOG.info("Read partition information from: " + path +  "  --> " + json );
            if (json != null) {
                jsonTopologyId = (String) ((Map<Object, Object>) json.get("topology")).get("id");
                jsonOffset = (Long) json.get("offset");
            }
        } catch (Throwable e) {
            LOG.warn("Error reading and/or parsing at ZkNode: " + path, e);
            // CHANGE HERE: throw this error rather than ignoring it
            throw e;
        }

Am not sure if this is wise or not though (haven't tested it).

> Kafka Spout doesn't pick up from the beginning of the queue unless forceFromStart specified
> -------------------------------------------------------------------------------------------
>
>                 Key: STORM-563
>                 URL: https://issues.apache.org/jira/browse/STORM-563
>             Project: Apache Storm
>          Issue Type: Bug
>            Reporter: Sriharsha Chintalapani
>            Assignee: Sriharsha Chintalapani
>
> KafkaUtil.getOffset starts from LatestTime unless forceFromStart specified. It should pick this from KafkaConfig.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)