You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Maria Abramiuc <ma...@gmail.com> on 2016/04/21 15:19:20 UTC

Kafka Streams - read topic from beginning

Kafka Streams look great, but there is one thing I don't seem to find a way
to do:

- read a topic from beginning even if there is a offset saved:

         I have :

             props.put(StreamsConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        this works as described if there is no offset save.

      For a normal consumer we have:

              seekToBeginning


In KafkaConsumer:

Line 132:

NetworkClient netClient = new NetworkClient(new
Selector(config.getLong("connections.max.idle.ms").longValue(),
this.metrics, this.time, metricGrpPrefix, metricsTags,
channelBuilder), this.metadata, this.clientId, 100,
config.getLong("reconnect.backoff.ms").longValue(),
config.getInt("send.buffer.bytes").intValue(),
config.getInt("receive.buffer.bytes").intValue(),
config.getInt("request.timeout.ms").intValue(), this.time);
this.client = new ConsumerNetworkClient(netClient, this.metadata,
this.time, this.retryBackoffMs);
OffsetResetStrategy offsetResetStrategy =
OffsetResetStrategy.valueOf(config.getString("auto.offset.reset").toUpperCase());
this.subscriptions = new SubscriptionState(offsetResetStrategy);

I can't find any way to set the consumer using StreamsConfig
properties to seekToBeginning or to set subscriptionState to need
offset reset.



*Is there a way to force the consumption of a topic from begging using
Kafka Streams?*


Thank you for all the support provided,

Maria Abramiuc

Re: Kafka Streams - read topic from beginning

Posted by Guozhang Wang <wa...@gmail.com>.
Hello Maria,

We have some thoughts about supporting finer grained flow controls in Kafka
Streams https://issues.apache.org/jira/browse/KAFKA-3478 as part of a big
effort to improve re-processing user experience, which covers this use
case. We are shooting to have this post 0.10.0.0.

As for now, one work-around I can think of is that upon restart /
re-processing, you can delete the offsets through an admin request (look at
ConsumerGroupCommand).


Guozhang


On Thu, Apr 21, 2016 at 6:19 AM, Maria Abramiuc <ma...@gmail.com>
wrote:

> Kafka Streams look great, but there is one thing I don't seem to find a way
> to do:
>
> - read a topic from beginning even if there is a offset saved:
>
>          I have :
>
>              props.put(StreamsConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
>
>         this works as described if there is no offset save.
>
>       For a normal consumer we have:
>
>               seekToBeginning
>
>
> In KafkaConsumer:
>
> Line 132:
>
> NetworkClient netClient = new NetworkClient(new
> Selector(config.getLong("connections.max.idle.ms").longValue(),
> this.metrics, this.time, metricGrpPrefix, metricsTags,
> channelBuilder), this.metadata, this.clientId, 100,
> config.getLong("reconnect.backoff.ms").longValue(),
> config.getInt("send.buffer.bytes").intValue(),
> config.getInt("receive.buffer.bytes").intValue(),
> config.getInt("request.timeout.ms").intValue(), this.time);
> this.client = new ConsumerNetworkClient(netClient, this.metadata,
> this.time, this.retryBackoffMs);
> OffsetResetStrategy offsetResetStrategy =
>
> OffsetResetStrategy.valueOf(config.getString("auto.offset.reset").toUpperCase());
> this.subscriptions = new SubscriptionState(offsetResetStrategy);
>
> I can't find any way to set the consumer using StreamsConfig
> properties to seekToBeginning or to set subscriptionState to need
> offset reset.
>
>
>
> *Is there a way to force the consumption of a topic from begging using
> Kafka Streams?*
>
>
> Thank you for all the support provided,
>
> Maria Abramiuc
>



-- 
-- Guozhang