You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Sachin Mittal <sj...@gmail.com> on 2016/11/21 10:37:56 UTC

How to get and reset Kafka stream application's offsets

Hi
I am running a streaming application with
streamsProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");

How do I find out the offsets for each of the source, intermediate and
internal topics associated with this application.

And how can I reset them to some specific value via shell of otherwise.

Thanks
Sachin

Re: How to get and reset Kafka stream application's offsets

Posted by "Matthias J. Sax" <ma...@confluent.io>.
One more thing to add:

You could build an own tool to set specific offsets for each partition.
The mentioned reset tool does something similar, setting the offset for
all partitions to zero. But you could just copy the code and enhance it
to set arbitrary offsets for each partition.

However, for must cases, auto-offset-reset should cover most uses cases
out-of-the-box.

-Matthias

On 11/21/16 5:38 AM, Eno Thereska wrote:
> Hi Sachin,
> 
> There is no need to check within the app whether the offset exists or not, since the consumer code will do that check automatically for you. So in practice an app will include the property below (e.g., set to "earliest"), but that will only have an effect if the consumer detects that the offsets do not exist anymore. If the offset exist, then that line is a noop.
> 
> So in summary, I'd just include that property, and no more code changes are required.
> 
> Thanks
> Eno
> 
>> On 21 Nov 2016, at 12:11, Sachin Mittal <sj...@gmail.com> wrote:
>>
>> So in my java code how can I check
>> when there is no initial offset in Kafka or if the current offset does not
>> exist any more on the server (e.g. because that data has been deleted)
>>
>> So in this case as you have said I can set offset as
>> props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); //or latest
>>
>> Thanks
>> Sachin
>>
>>
>> On Mon, Nov 21, 2016 at 4:16 PM, Eno Thereska <eno.thereska@gmail.com <ma...@gmail.com>>
>> wrote:
>>
>>> Hi Sachin,
>>>
>>> Currently you can only change the following global configuration by using
>>> "earliest" or "latest", as shown in the code snippet below. As the Javadoc
>>> mentions: "What to do when there is no initial offset in Kafka or if the
>>> current offset does not exist any more on the server (e.g. because that
>>> data has been deleted)":
>>>
>>> ...
>>> props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
>>> ...
>>> return new KafkaStreams(builder, props)
>>>
>>>
>>>
>>> In addition, there is a tool to reset the offsets of all topics to the
>>> beginning. This is useful for reprocessing: https://www.confluent.io/blog/
>>> data-reprocessing-with-kafka-streams-resetting-a-streams-application/ <
>>> https://www.confluent.io/blog/data-reprocessing-with- <https://www.confluent.io/blog/data-reprocessing-with->
>>> kafka-streams-resetting-a-streams-application/>
>>>
>>> However, there is no option currently for resetting the offset to an
>>> arbitrary offset.
>>>
>>> Thanks
>>> Eno
>>>
>>>> On 21 Nov 2016, at 10:37, Sachin Mittal <sj...@gmail.com> wrote:
>>>>
>>>> Hi
>>>> I am running a streaming application with
>>>> streamsProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
>>>>
>>>> How do I find out the offsets for each of the source, intermediate and
>>>> internal topics associated with this application.
>>>>
>>>> And how can I reset them to some specific value via shell of otherwise.
>>>>
>>>> Thanks
>>>> Sachin
> 
> 


Re: How to get and reset Kafka stream application's offsets

Posted by "Matthias J. Sax" <ma...@confluent.io>.
Exactly.


On 12/28/16 10:47 AM, Sachin Mittal wrote:
> Understood. So if we want it to start consuming from earliest we should add
> props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
> 
> So when we start the app first time it will start from earliest. Later when
> we stop this app and restart it will start from point where we has last
> committed offset,.
> (No need to comment this line then and then restart.)
> 
> Hence earliest (or latest) offset is used only when no offsets are found
> for that consumer group.
> 
> Thanks
> Sachin
> 
> 
> On Wed, Dec 28, 2016 at 3:02 PM, Matthias J. Sax <ma...@confluent.io>
> wrote:
> 
>> Hi Sachin,
>>
>> What do you mean by "with this commented"? Did you set auto.offset.reset
>> to "earliest" or not? Default value is "latest" and if you do not set it
>> to "earliest", that the application will start consuming from
>> end-of-topic if no committed offsets are found.
>>
>> For default values of Kafka Streams parameters see
>> http://docs.confluent.io/3.0.1/streams/developer-guide.
>> html#configuring-a-kafka-streams-application
>>
>> For default consumer parameters see
>> http://kafka.apache.org/0100/documentation.html#newconsumerconfigs
>>
>>
>> -Matthias
>>
>>
>> On 12/27/16 3:27 PM, Sachin Mittal wrote:
>>> Hi,
>>> I started my new streams app with this commented
>>> //props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
>>>
>>> What I observed was that it started consuming message from latest offset
>>> from the source topic.
>>> Based on comments by Eno I thought that if offset do not exist then
>>>
>>> So in practice an app will include the property below (e.g., set to
>>> "earliest")
>>>
>>> However it picked the latest offset. Also I check this props
>>> auto.offset.reset in the latest doc and I see it as largest.
>>>
>>> So now I am confused that in streams application what is the default
>> offset
>>> if none exist?
>>>
>>> note I am using version kafka_2.10-0.10.0.1.
>>>
>>> Thanks
>>> Sachin
>>>
>>>
>>>
>>> On Mon, Nov 21, 2016 at 7:08 PM, Eno Thereska <en...@gmail.com>
>>> wrote:
>>>
>>>> Hi Sachin,
>>>>
>>>> There is no need to check within the app whether the offset exists or
>> not,
>>>> since the consumer code will do that check automatically for you. So in
>>>> practice an app will include the property below (e.g., set to
>> "earliest"),
>>>> but that will only have an effect if the consumer detects that the
>> offsets
>>>> do not exist anymore. If the offset exist, then that line is a noop.
>>>>
>>>> So in summary, I'd just include that property, and no more code changes
>>>> are required.
>>>>
>>>> Thanks
>>>> Eno
>>>>
>>>>> On 21 Nov 2016, at 12:11, Sachin Mittal <sj...@gmail.com> wrote:
>>>>>
>>>>> So in my java code how can I check
>>>>> when there is no initial offset in Kafka or if the current offset does
>>>> not
>>>>> exist any more on the server (e.g. because that data has been deleted)
>>>>>
>>>>> So in this case as you have said I can set offset as
>>>>> props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); //or
>>>> latest
>>>>>
>>>>> Thanks
>>>>> Sachin
>>>>>
>>>>>
>>>>> On Mon, Nov 21, 2016 at 4:16 PM, Eno Thereska <eno.thereska@gmail.com
>>>> <ma...@gmail.com>>
>>>>> wrote:
>>>>>
>>>>>> Hi Sachin,
>>>>>>
>>>>>> Currently you can only change the following global configuration by
>>>> using
>>>>>> "earliest" or "latest", as shown in the code snippet below. As the
>>>> Javadoc
>>>>>> mentions: "What to do when there is no initial offset in Kafka or if
>> the
>>>>>> current offset does not exist any more on the server (e.g. because
>> that
>>>>>> data has been deleted)":
>>>>>>
>>>>>> ...
>>>>>> props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
>>>>>> ...
>>>>>> return new KafkaStreams(builder, props)
>>>>>>
>>>>>>
>>>>>>
>>>>>> In addition, there is a tool to reset the offsets of all topics to the
>>>>>> beginning. This is useful for reprocessing:
>>>> https://www.confluent.io/blog/
>>>>>> data-reprocessing-with-kafka-streams-resetting-a-streams-application/
>> <
>>>>>> https://www.confluent.io/blog/data-reprocessing-with- <
>>>> https://www.confluent.io/blog/data-reprocessing-with->
>>>>>> kafka-streams-resetting-a-streams-application/>
>>>>>>
>>>>>> However, there is no option currently for resetting the offset to an
>>>>>> arbitrary offset.
>>>>>>
>>>>>> Thanks
>>>>>> Eno
>>>>>>
>>>>>>> On 21 Nov 2016, at 10:37, Sachin Mittal <sj...@gmail.com> wrote:
>>>>>>>
>>>>>>> Hi
>>>>>>> I am running a streaming application with
>>>>>>> streamsProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
>>>>>>>
>>>>>>> How do I find out the offsets for each of the source, intermediate
>> and
>>>>>>> internal topics associated with this application.
>>>>>>>
>>>>>>> And how can I reset them to some specific value via shell of
>> otherwise.
>>>>>>>
>>>>>>> Thanks
>>>>>>> Sachin
>>>>
>>>>
>>>
>>
>>
> 


Re: How to get and reset Kafka stream application's offsets

Posted by Sachin Mittal <sj...@gmail.com>.
Understood. So if we want it to start consuming from earliest we should add
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

So when we start the app first time it will start from earliest. Later when
we stop this app and restart it will start from point where we has last
committed offset,.
(No need to comment this line then and then restart.)

Hence earliest (or latest) offset is used only when no offsets are found
for that consumer group.

Thanks
Sachin


On Wed, Dec 28, 2016 at 3:02 PM, Matthias J. Sax <ma...@confluent.io>
wrote:

> Hi Sachin,
>
> What do you mean by "with this commented"? Did you set auto.offset.reset
> to "earliest" or not? Default value is "latest" and if you do not set it
> to "earliest", that the application will start consuming from
> end-of-topic if no committed offsets are found.
>
> For default values of Kafka Streams parameters see
> http://docs.confluent.io/3.0.1/streams/developer-guide.
> html#configuring-a-kafka-streams-application
>
> For default consumer parameters see
> http://kafka.apache.org/0100/documentation.html#newconsumerconfigs
>
>
> -Matthias
>
>
> On 12/27/16 3:27 PM, Sachin Mittal wrote:
> > Hi,
> > I started my new streams app with this commented
> > //props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
> >
> > What I observed was that it started consuming message from latest offset
> > from the source topic.
> > Based on comments by Eno I thought that if offset do not exist then
> >
> > So in practice an app will include the property below (e.g., set to
> > "earliest")
> >
> > However it picked the latest offset. Also I check this props
> > auto.offset.reset in the latest doc and I see it as largest.
> >
> > So now I am confused that in streams application what is the default
> offset
> > if none exist?
> >
> > note I am using version kafka_2.10-0.10.0.1.
> >
> > Thanks
> > Sachin
> >
> >
> >
> > On Mon, Nov 21, 2016 at 7:08 PM, Eno Thereska <en...@gmail.com>
> > wrote:
> >
> >> Hi Sachin,
> >>
> >> There is no need to check within the app whether the offset exists or
> not,
> >> since the consumer code will do that check automatically for you. So in
> >> practice an app will include the property below (e.g., set to
> "earliest"),
> >> but that will only have an effect if the consumer detects that the
> offsets
> >> do not exist anymore. If the offset exist, then that line is a noop.
> >>
> >> So in summary, I'd just include that property, and no more code changes
> >> are required.
> >>
> >> Thanks
> >> Eno
> >>
> >>> On 21 Nov 2016, at 12:11, Sachin Mittal <sj...@gmail.com> wrote:
> >>>
> >>> So in my java code how can I check
> >>> when there is no initial offset in Kafka or if the current offset does
> >> not
> >>> exist any more on the server (e.g. because that data has been deleted)
> >>>
> >>> So in this case as you have said I can set offset as
> >>> props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); //or
> >> latest
> >>>
> >>> Thanks
> >>> Sachin
> >>>
> >>>
> >>> On Mon, Nov 21, 2016 at 4:16 PM, Eno Thereska <eno.thereska@gmail.com
> >> <ma...@gmail.com>>
> >>> wrote:
> >>>
> >>>> Hi Sachin,
> >>>>
> >>>> Currently you can only change the following global configuration by
> >> using
> >>>> "earliest" or "latest", as shown in the code snippet below. As the
> >> Javadoc
> >>>> mentions: "What to do when there is no initial offset in Kafka or if
> the
> >>>> current offset does not exist any more on the server (e.g. because
> that
> >>>> data has been deleted)":
> >>>>
> >>>> ...
> >>>> props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
> >>>> ...
> >>>> return new KafkaStreams(builder, props)
> >>>>
> >>>>
> >>>>
> >>>> In addition, there is a tool to reset the offsets of all topics to the
> >>>> beginning. This is useful for reprocessing:
> >> https://www.confluent.io/blog/
> >>>> data-reprocessing-with-kafka-streams-resetting-a-streams-application/
> <
> >>>> https://www.confluent.io/blog/data-reprocessing-with- <
> >> https://www.confluent.io/blog/data-reprocessing-with->
> >>>> kafka-streams-resetting-a-streams-application/>
> >>>>
> >>>> However, there is no option currently for resetting the offset to an
> >>>> arbitrary offset.
> >>>>
> >>>> Thanks
> >>>> Eno
> >>>>
> >>>>> On 21 Nov 2016, at 10:37, Sachin Mittal <sj...@gmail.com> wrote:
> >>>>>
> >>>>> Hi
> >>>>> I am running a streaming application with
> >>>>> streamsProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
> >>>>>
> >>>>> How do I find out the offsets for each of the source, intermediate
> and
> >>>>> internal topics associated with this application.
> >>>>>
> >>>>> And how can I reset them to some specific value via shell of
> otherwise.
> >>>>>
> >>>>> Thanks
> >>>>> Sachin
> >>
> >>
> >
>
>

Re: How to get and reset Kafka stream application's offsets

Posted by "Matthias J. Sax" <ma...@confluent.io>.
Hi Sachin,

What do you mean by "with this commented"? Did you set auto.offset.reset
to "earliest" or not? Default value is "latest" and if you do not set it
to "earliest", that the application will start consuming from
end-of-topic if no committed offsets are found.

For default values of Kafka Streams parameters see
http://docs.confluent.io/3.0.1/streams/developer-guide.html#configuring-a-kafka-streams-application

For default consumer parameters see
http://kafka.apache.org/0100/documentation.html#newconsumerconfigs


-Matthias


On 12/27/16 3:27 PM, Sachin Mittal wrote:
> Hi,
> I started my new streams app with this commented
> //props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
> 
> What I observed was that it started consuming message from latest offset
> from the source topic.
> Based on comments by Eno I thought that if offset do not exist then
> 
> So in practice an app will include the property below (e.g., set to
> "earliest")
> 
> However it picked the latest offset. Also I check this props
> auto.offset.reset in the latest doc and I see it as largest.
> 
> So now I am confused that in streams application what is the default offset
> if none exist?
> 
> note I am using version kafka_2.10-0.10.0.1.
> 
> Thanks
> Sachin
> 
> 
> 
> On Mon, Nov 21, 2016 at 7:08 PM, Eno Thereska <en...@gmail.com>
> wrote:
> 
>> Hi Sachin,
>>
>> There is no need to check within the app whether the offset exists or not,
>> since the consumer code will do that check automatically for you. So in
>> practice an app will include the property below (e.g., set to "earliest"),
>> but that will only have an effect if the consumer detects that the offsets
>> do not exist anymore. If the offset exist, then that line is a noop.
>>
>> So in summary, I'd just include that property, and no more code changes
>> are required.
>>
>> Thanks
>> Eno
>>
>>> On 21 Nov 2016, at 12:11, Sachin Mittal <sj...@gmail.com> wrote:
>>>
>>> So in my java code how can I check
>>> when there is no initial offset in Kafka or if the current offset does
>> not
>>> exist any more on the server (e.g. because that data has been deleted)
>>>
>>> So in this case as you have said I can set offset as
>>> props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); //or
>> latest
>>>
>>> Thanks
>>> Sachin
>>>
>>>
>>> On Mon, Nov 21, 2016 at 4:16 PM, Eno Thereska <eno.thereska@gmail.com
>> <ma...@gmail.com>>
>>> wrote:
>>>
>>>> Hi Sachin,
>>>>
>>>> Currently you can only change the following global configuration by
>> using
>>>> "earliest" or "latest", as shown in the code snippet below. As the
>> Javadoc
>>>> mentions: "What to do when there is no initial offset in Kafka or if the
>>>> current offset does not exist any more on the server (e.g. because that
>>>> data has been deleted)":
>>>>
>>>> ...
>>>> props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
>>>> ...
>>>> return new KafkaStreams(builder, props)
>>>>
>>>>
>>>>
>>>> In addition, there is a tool to reset the offsets of all topics to the
>>>> beginning. This is useful for reprocessing:
>> https://www.confluent.io/blog/
>>>> data-reprocessing-with-kafka-streams-resetting-a-streams-application/ <
>>>> https://www.confluent.io/blog/data-reprocessing-with- <
>> https://www.confluent.io/blog/data-reprocessing-with->
>>>> kafka-streams-resetting-a-streams-application/>
>>>>
>>>> However, there is no option currently for resetting the offset to an
>>>> arbitrary offset.
>>>>
>>>> Thanks
>>>> Eno
>>>>
>>>>> On 21 Nov 2016, at 10:37, Sachin Mittal <sj...@gmail.com> wrote:
>>>>>
>>>>> Hi
>>>>> I am running a streaming application with
>>>>> streamsProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
>>>>>
>>>>> How do I find out the offsets for each of the source, intermediate and
>>>>> internal topics associated with this application.
>>>>>
>>>>> And how can I reset them to some specific value via shell of otherwise.
>>>>>
>>>>> Thanks
>>>>> Sachin
>>
>>
> 


Re: How to get and reset Kafka stream application's offsets

Posted by Sachin Mittal <sj...@gmail.com>.
Hi,
I started my new streams app with this commented
//props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

What I observed was that it started consuming message from latest offset
from the source topic.
Based on comments by Eno I thought that if offset do not exist then

So in practice an app will include the property below (e.g., set to
"earliest")

However it picked the latest offset. Also I check this props
auto.offset.reset in the latest doc and I see it as largest.

So now I am confused that in streams application what is the default offset
if none exist?

note I am using version kafka_2.10-0.10.0.1.

Thanks
Sachin



On Mon, Nov 21, 2016 at 7:08 PM, Eno Thereska <en...@gmail.com>
wrote:

> Hi Sachin,
>
> There is no need to check within the app whether the offset exists or not,
> since the consumer code will do that check automatically for you. So in
> practice an app will include the property below (e.g., set to "earliest"),
> but that will only have an effect if the consumer detects that the offsets
> do not exist anymore. If the offset exist, then that line is a noop.
>
> So in summary, I'd just include that property, and no more code changes
> are required.
>
> Thanks
> Eno
>
> > On 21 Nov 2016, at 12:11, Sachin Mittal <sj...@gmail.com> wrote:
> >
> > So in my java code how can I check
> > when there is no initial offset in Kafka or if the current offset does
> not
> > exist any more on the server (e.g. because that data has been deleted)
> >
> > So in this case as you have said I can set offset as
> > props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); //or
> latest
> >
> > Thanks
> > Sachin
> >
> >
> > On Mon, Nov 21, 2016 at 4:16 PM, Eno Thereska <eno.thereska@gmail.com
> <ma...@gmail.com>>
> > wrote:
> >
> >> Hi Sachin,
> >>
> >> Currently you can only change the following global configuration by
> using
> >> "earliest" or "latest", as shown in the code snippet below. As the
> Javadoc
> >> mentions: "What to do when there is no initial offset in Kafka or if the
> >> current offset does not exist any more on the server (e.g. because that
> >> data has been deleted)":
> >>
> >> ...
> >> props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
> >> ...
> >> return new KafkaStreams(builder, props)
> >>
> >>
> >>
> >> In addition, there is a tool to reset the offsets of all topics to the
> >> beginning. This is useful for reprocessing:
> https://www.confluent.io/blog/
> >> data-reprocessing-with-kafka-streams-resetting-a-streams-application/ <
> >> https://www.confluent.io/blog/data-reprocessing-with- <
> https://www.confluent.io/blog/data-reprocessing-with->
> >> kafka-streams-resetting-a-streams-application/>
> >>
> >> However, there is no option currently for resetting the offset to an
> >> arbitrary offset.
> >>
> >> Thanks
> >> Eno
> >>
> >>> On 21 Nov 2016, at 10:37, Sachin Mittal <sj...@gmail.com> wrote:
> >>>
> >>> Hi
> >>> I am running a streaming application with
> >>> streamsProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
> >>>
> >>> How do I find out the offsets for each of the source, intermediate and
> >>> internal topics associated with this application.
> >>>
> >>> And how can I reset them to some specific value via shell of otherwise.
> >>>
> >>> Thanks
> >>> Sachin
>
>

Re: How to get and reset Kafka stream application's offsets

Posted by Eno Thereska <en...@gmail.com>.
Hi Sachin,

There is no need to check within the app whether the offset exists or not, since the consumer code will do that check automatically for you. So in practice an app will include the property below (e.g., set to "earliest"), but that will only have an effect if the consumer detects that the offsets do not exist anymore. If the offset exist, then that line is a noop.

So in summary, I'd just include that property, and no more code changes are required.

Thanks
Eno

> On 21 Nov 2016, at 12:11, Sachin Mittal <sj...@gmail.com> wrote:
> 
> So in my java code how can I check
> when there is no initial offset in Kafka or if the current offset does not
> exist any more on the server (e.g. because that data has been deleted)
> 
> So in this case as you have said I can set offset as
> props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); //or latest
> 
> Thanks
> Sachin
> 
> 
> On Mon, Nov 21, 2016 at 4:16 PM, Eno Thereska <eno.thereska@gmail.com <ma...@gmail.com>>
> wrote:
> 
>> Hi Sachin,
>> 
>> Currently you can only change the following global configuration by using
>> "earliest" or "latest", as shown in the code snippet below. As the Javadoc
>> mentions: "What to do when there is no initial offset in Kafka or if the
>> current offset does not exist any more on the server (e.g. because that
>> data has been deleted)":
>> 
>> ...
>> props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
>> ...
>> return new KafkaStreams(builder, props)
>> 
>> 
>> 
>> In addition, there is a tool to reset the offsets of all topics to the
>> beginning. This is useful for reprocessing: https://www.confluent.io/blog/
>> data-reprocessing-with-kafka-streams-resetting-a-streams-application/ <
>> https://www.confluent.io/blog/data-reprocessing-with- <https://www.confluent.io/blog/data-reprocessing-with->
>> kafka-streams-resetting-a-streams-application/>
>> 
>> However, there is no option currently for resetting the offset to an
>> arbitrary offset.
>> 
>> Thanks
>> Eno
>> 
>>> On 21 Nov 2016, at 10:37, Sachin Mittal <sj...@gmail.com> wrote:
>>> 
>>> Hi
>>> I am running a streaming application with
>>> streamsProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
>>> 
>>> How do I find out the offsets for each of the source, intermediate and
>>> internal topics associated with this application.
>>> 
>>> And how can I reset them to some specific value via shell of otherwise.
>>> 
>>> Thanks
>>> Sachin


Re: How to get and reset Kafka stream application's offsets

Posted by Sachin Mittal <sj...@gmail.com>.
So in my java code how can I check
when there is no initial offset in Kafka or if the current offset does not
exist any more on the server (e.g. because that data has been deleted)

So in this case as you have said I can set offset as
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); //or latest

Thanks
Sachin


On Mon, Nov 21, 2016 at 4:16 PM, Eno Thereska <en...@gmail.com>
wrote:

> Hi Sachin,
>
> Currently you can only change the following global configuration by using
> "earliest" or "latest", as shown in the code snippet below. As the Javadoc
> mentions: "What to do when there is no initial offset in Kafka or if the
> current offset does not exist any more on the server (e.g. because that
> data has been deleted)":
>
> ...
> props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
> ...
> return new KafkaStreams(builder, props)
>
>
>
> In addition, there is a tool to reset the offsets of all topics to the
> beginning. This is useful for reprocessing: https://www.confluent.io/blog/
> data-reprocessing-with-kafka-streams-resetting-a-streams-application/ <
> https://www.confluent.io/blog/data-reprocessing-with-
> kafka-streams-resetting-a-streams-application/>
>
> However, there is no option currently for resetting the offset to an
> arbitrary offset.
>
> Thanks
> Eno
>
> > On 21 Nov 2016, at 10:37, Sachin Mittal <sj...@gmail.com> wrote:
> >
> > Hi
> > I am running a streaming application with
> > streamsProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
> >
> > How do I find out the offsets for each of the source, intermediate and
> > internal topics associated with this application.
> >
> > And how can I reset them to some specific value via shell of otherwise.
> >
> > Thanks
> > Sachin
>
>

Re: How to get and reset Kafka stream application's offsets

Posted by Eno Thereska <en...@gmail.com>.
Hi Sachin,

Currently you can only change the following global configuration by using "earliest" or "latest", as shown in the code snippet below. As the Javadoc mentions: "What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted)":

...
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
...
return new KafkaStreams(builder, props)



In addition, there is a tool to reset the offsets of all topics to the beginning. This is useful for reprocessing: https://www.confluent.io/blog/data-reprocessing-with-kafka-streams-resetting-a-streams-application/ <https://www.confluent.io/blog/data-reprocessing-with-kafka-streams-resetting-a-streams-application/>

However, there is no option currently for resetting the offset to an arbitrary offset.

Thanks
Eno

> On 21 Nov 2016, at 10:37, Sachin Mittal <sj...@gmail.com> wrote:
> 
> Hi
> I am running a streaming application with
> streamsProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
> 
> How do I find out the offsets for each of the source, intermediate and
> internal topics associated with this application.
> 
> And how can I reset them to some specific value via shell of otherwise.
> 
> Thanks
> Sachin