You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Will Funnell (JIRA)" <ji...@apache.org> on 2015/09/10 18:26:46 UTC

[jira] [Comment Edited] (KAFKA-2500) Make logEndOffset available in the 0.8.3 Consumer

    [ https://issues.apache.org/jira/browse/KAFKA-2500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14739059#comment-14739059 ] 

Will Funnell edited comment on KAFKA-2500 at 9/10/15 4:26 PM:
--------------------------------------------------------------

If it helps understand our use case, this is the API we have implemented to consume messages, using reactive streams.

{code}
    public interface KafkaClient<T> {

    /**
     * Obtain a finite stream for use with log compacted topic that is provider of sequenced stream of all the messages from the previously saved position,
     * publishing them according to the demand received from its Subscriber, finishing when each message has been read exactly once.
     */
    Publisher<T> retrieveAll();

    /**
     * Obtain an infinite stream that is provider of sequenced stream of all the messages from the previously saved position,
     * publishing them according to the demand received from its Subscriber.
     */
    Publisher<T> retrieve();

    /**
     * Resets position where to consume messages from to the beginning
     *
     * @throws ResetFailed
     */
    void reset() throws ResetFailed;
}
{code}


was (Author: willf):
If it helps understand our use case, this is the API we have implemented to consume messages, using reactive streams.

{code}
    public interface KafkaClient<T> {

    /**
     * Obtain a finite stream for use with log compacted topic that is provider of sequenced stream of all the properties from the previously saved position,
     * publishing them according to the demand received from its Subscriber, finishing when each message has been read exactly once.
     */
    Publisher<T> retrieveAll();

    /**
     * Obtain an infinite stream that is provider of sequenced stream of all the properties from the previously saved position,
     * publishing them according to the demand received from its Subscriber.
     */
    Publisher<T> retrieve();

    /**
     * Resets position where to consume messages from to the beginning
     *
     * @throws ResetFailed
     */
    void reset() throws ResetFailed;
}
{code}

> Make logEndOffset available in the 0.8.3 Consumer
> -------------------------------------------------
>
>                 Key: KAFKA-2500
>                 URL: https://issues.apache.org/jira/browse/KAFKA-2500
>             Project: Kafka
>          Issue Type: Sub-task
>          Components: consumer
>    Affects Versions: 0.8.3
>            Reporter: Will Funnell
>            Assignee: Jason Gustafson
>            Priority: Critical
>             Fix For: 0.8.3
>
>
> Originally created in the old consumer here: https://issues.apache.org/jira/browse/KAFKA-1977
> The requirement is to create a snapshot from the Kafka topic but NOT do continual reads after that point. For example you might be creating a backup of the data to a file.
> This ticket covers the addition of the functionality to the new consumer.
> In order to achieve that, a recommended solution by Joel Koshy and Jay Kreps was to expose the high watermark, as maxEndOffset, from the FetchResponse object through to each MessageAndMetadata object in order to be aware when the consumer has reached the end of each partition.
> The submitted patch achieves this by adding the maxEndOffset to the PartitionTopicInfo, which is updated when a new message arrives in the ConsumerFetcherThread and then exposed in MessageAndMetadata.
> See here for discussion:
> http://search-hadoop.com/m/4TaT4TpJy71



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)