You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Nick Telford <ni...@gmail.com> on 2022/11/11 14:47:55 UTC

Streams: clarification needed, checkpoint vs. position files

Hi everyone,

I'm trying to understand how StateStores work internally for some changes
that I plan to propose, and I'd like some clarification around checkpoint
files and position files.

It appears as though position files are relatively new, and were created as
part of the IQv2 initiative, as a means to track the position of the local
state store so that reads could be bound by particular positions?

Checkpoint files look much older, and are managed by the Task itself
(actually, ProcessorStateManager). It looks like this is used exclusively
for determining a) whether to restore a store, and b) which offsets to
restore from?

If I've understood the above correctly, is there any scope to potentially
replace checkpoint files with StateStore#position()?

Regards,

Nick

Re: Streams: clarification needed, checkpoint vs. position files

Posted by Sophie Blee-Goldman <so...@confluent.io.INVALID>.
Ah, good thing John showed up to clear things up -- sorry for the
misinformation
in my first reply (fake news!)

Generally speaking only public APIs *need* to be included in the KIP
document,
anything internal is an implementation detail as you said. In some cases it
may still be appropriate to touch on the actual implementation to fully
describe
the KIP but yu don't need to worry about that unless the KIP is making
drastic
changes to the internal architecture or the implementation has some effect
on the final semantics that user see, etc

On Mon, Nov 14, 2022 at 9:15 AM Nick Telford <ni...@gmail.com> wrote:

> Thank you both, the key point I was missing was that position files track
> the offsets of the *source* topics, whereas the checkpoint file tracks the
> offset(s) of the changelog topics.
>
> Do you know if I need to include interface/API changes to internal
> classes/interfaces (those under the
> org.apache.kafka.streams.processor.internals package) in a KIP, or are they
> considered implementation details?
>
> Cheers,
> Nick
>
> On Sat, 12 Nov 2022 at 03:59, John Roesler <vv...@apache.org> wrote:
>
> > Hi all,
> >
> > Just to clarify: there actually is a position file. It was a small detail
> > of the IQv2 implementation to add it, otherwise a persistent store's
> > position would be lost after a restart.
> >
> > Otherwise, Sophie is right on the money. The checkpoint refers to an
> > offset in the changelog, while the position refers to offsets in the
> task's
> > input topics topics. So they are similar in function and structure, but
> > they refer to two different things.
> >
> > I agree that, given this, it doesn't seem like consolidating them (for
> > example, into one file) would be worth it. It would make the code more
> > complicated without deduping any information.
> >
> > I hope this helps, and look forward to what you're cooking up, Nick!
> > -John
> >
> > On 2022/11/12 00:50:27 Sophie Blee-Goldman wrote:
> > > Hey Nick,
> > >
> > > I haven't been following the new IQv2 work very closely so take this
> > with a
> > > grain of salt,
> > > but as far as I'm aware there's no such thing as "position files" --
> the
> > > Position is just an
> > > in-memory object and is related to a user's query against the state
> > store,
> > > whereas a
> > > checkpoint file reflects the current state of the store ie how much of
> > the
> > > changelog it
> > > contains.
> > >
> > > In other words while these might look like they do similar things, the
> > > actual usage and
> > > implementation of Positions vs checkpoint files is pretty much
> unrelated.
> > > So I don't think
> > > it would sense for Streams to try and consolidate these or replace one
> > with
> > > another.
> > >
> > > Hope this answers your question, and I'll ping John to make sure I'm
> not
> > > misleading
> > > you regarding the usage/intention of Positions
> > >
> > > Sophie
> > >
> > > On Fri, Nov 11, 2022 at 6:48 AM Nick Telford <ni...@gmail.com>
> > wrote:
> > >
> > > > Hi everyone,
> > > >
> > > > I'm trying to understand how StateStores work internally for some
> > changes
> > > > that I plan to propose, and I'd like some clarification around
> > checkpoint
> > > > files and position files.
> > > >
> > > > It appears as though position files are relatively new, and were
> > created as
> > > > part of the IQv2 initiative, as a means to track the position of the
> > local
> > > > state store so that reads could be bound by particular positions?
> > > >
> > > > Checkpoint files look much older, and are managed by the Task itself
> > > > (actually, ProcessorStateManager). It looks like this is used
> > exclusively
> > > > for determining a) whether to restore a store, and b) which offsets
> to
> > > > restore from?
> > > >
> > > > If I've understood the above correctly, is there any scope to
> > potentially
> > > > replace checkpoint files with StateStore#position()?
> > > >
> > > > Regards,
> > > >
> > > > Nick
> > > >
> > >
> >
>

Re: Streams: clarification needed, checkpoint vs. position files

Posted by Nick Telford <ni...@gmail.com>.
Thank you both, the key point I was missing was that position files track
the offsets of the *source* topics, whereas the checkpoint file tracks the
offset(s) of the changelog topics.

Do you know if I need to include interface/API changes to internal
classes/interfaces (those under the
org.apache.kafka.streams.processor.internals package) in a KIP, or are they
considered implementation details?

Cheers,
Nick

On Sat, 12 Nov 2022 at 03:59, John Roesler <vv...@apache.org> wrote:

> Hi all,
>
> Just to clarify: there actually is a position file. It was a small detail
> of the IQv2 implementation to add it, otherwise a persistent store's
> position would be lost after a restart.
>
> Otherwise, Sophie is right on the money. The checkpoint refers to an
> offset in the changelog, while the position refers to offsets in the task's
> input topics topics. So they are similar in function and structure, but
> they refer to two different things.
>
> I agree that, given this, it doesn't seem like consolidating them (for
> example, into one file) would be worth it. It would make the code more
> complicated without deduping any information.
>
> I hope this helps, and look forward to what you're cooking up, Nick!
> -John
>
> On 2022/11/12 00:50:27 Sophie Blee-Goldman wrote:
> > Hey Nick,
> >
> > I haven't been following the new IQv2 work very closely so take this
> with a
> > grain of salt,
> > but as far as I'm aware there's no such thing as "position files" -- the
> > Position is just an
> > in-memory object and is related to a user's query against the state
> store,
> > whereas a
> > checkpoint file reflects the current state of the store ie how much of
> the
> > changelog it
> > contains.
> >
> > In other words while these might look like they do similar things, the
> > actual usage and
> > implementation of Positions vs checkpoint files is pretty much unrelated.
> > So I don't think
> > it would sense for Streams to try and consolidate these or replace one
> with
> > another.
> >
> > Hope this answers your question, and I'll ping John to make sure I'm not
> > misleading
> > you regarding the usage/intention of Positions
> >
> > Sophie
> >
> > On Fri, Nov 11, 2022 at 6:48 AM Nick Telford <ni...@gmail.com>
> wrote:
> >
> > > Hi everyone,
> > >
> > > I'm trying to understand how StateStores work internally for some
> changes
> > > that I plan to propose, and I'd like some clarification around
> checkpoint
> > > files and position files.
> > >
> > > It appears as though position files are relatively new, and were
> created as
> > > part of the IQv2 initiative, as a means to track the position of the
> local
> > > state store so that reads could be bound by particular positions?
> > >
> > > Checkpoint files look much older, and are managed by the Task itself
> > > (actually, ProcessorStateManager). It looks like this is used
> exclusively
> > > for determining a) whether to restore a store, and b) which offsets to
> > > restore from?
> > >
> > > If I've understood the above correctly, is there any scope to
> potentially
> > > replace checkpoint files with StateStore#position()?
> > >
> > > Regards,
> > >
> > > Nick
> > >
> >
>

Re: Streams: clarification needed, checkpoint vs. position files

Posted by John Roesler <vv...@apache.org>.
Hi all,

Just to clarify: there actually is a position file. It was a small detail of the IQv2 implementation to add it, otherwise a persistent store's position would be lost after a restart.

Otherwise, Sophie is right on the money. The checkpoint refers to an offset in the changelog, while the position refers to offsets in the task's input topics topics. So they are similar in function and structure, but they refer to two different things.

I agree that, given this, it doesn't seem like consolidating them (for example, into one file) would be worth it. It would make the code more complicated without deduping any information.

I hope this helps, and look forward to what you're cooking up, Nick!
-John

On 2022/11/12 00:50:27 Sophie Blee-Goldman wrote:
> Hey Nick,
> 
> I haven't been following the new IQv2 work very closely so take this with a
> grain of salt,
> but as far as I'm aware there's no such thing as "position files" -- the
> Position is just an
> in-memory object and is related to a user's query against the state store,
> whereas a
> checkpoint file reflects the current state of the store ie how much of the
> changelog it
> contains.
> 
> In other words while these might look like they do similar things, the
> actual usage and
> implementation of Positions vs checkpoint files is pretty much unrelated.
> So I don't think
> it would sense for Streams to try and consolidate these or replace one with
> another.
> 
> Hope this answers your question, and I'll ping John to make sure I'm not
> misleading
> you regarding the usage/intention of Positions
> 
> Sophie
> 
> On Fri, Nov 11, 2022 at 6:48 AM Nick Telford <ni...@gmail.com> wrote:
> 
> > Hi everyone,
> >
> > I'm trying to understand how StateStores work internally for some changes
> > that I plan to propose, and I'd like some clarification around checkpoint
> > files and position files.
> >
> > It appears as though position files are relatively new, and were created as
> > part of the IQv2 initiative, as a means to track the position of the local
> > state store so that reads could be bound by particular positions?
> >
> > Checkpoint files look much older, and are managed by the Task itself
> > (actually, ProcessorStateManager). It looks like this is used exclusively
> > for determining a) whether to restore a store, and b) which offsets to
> > restore from?
> >
> > If I've understood the above correctly, is there any scope to potentially
> > replace checkpoint files with StateStore#position()?
> >
> > Regards,
> >
> > Nick
> >
> 

Re: Streams: clarification needed, checkpoint vs. position files

Posted by Sophie Blee-Goldman <so...@confluent.io.INVALID>.
Hey Nick,

I haven't been following the new IQv2 work very closely so take this with a
grain of salt,
but as far as I'm aware there's no such thing as "position files" -- the
Position is just an
in-memory object and is related to a user's query against the state store,
whereas a
checkpoint file reflects the current state of the store ie how much of the
changelog it
contains.

In other words while these might look like they do similar things, the
actual usage and
implementation of Positions vs checkpoint files is pretty much unrelated.
So I don't think
it would sense for Streams to try and consolidate these or replace one with
another.

Hope this answers your question, and I'll ping John to make sure I'm not
misleading
you regarding the usage/intention of Positions

Sophie

On Fri, Nov 11, 2022 at 6:48 AM Nick Telford <ni...@gmail.com> wrote:

> Hi everyone,
>
> I'm trying to understand how StateStores work internally for some changes
> that I plan to propose, and I'd like some clarification around checkpoint
> files and position files.
>
> It appears as though position files are relatively new, and were created as
> part of the IQv2 initiative, as a means to track the position of the local
> state store so that reads could be bound by particular positions?
>
> Checkpoint files look much older, and are managed by the Task itself
> (actually, ProcessorStateManager). It looks like this is used exclusively
> for determining a) whether to restore a store, and b) which offsets to
> restore from?
>
> If I've understood the above correctly, is there any scope to potentially
> replace checkpoint files with StateStore#position()?
>
> Regards,
>
> Nick
>