You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Jorge Esteban Quilcate Otoya <qu...@gmail.com> on 2020/08/28 18:31:40 UTC

[DISCUSS] KIP-666: Add Instant-based methods to ReadOnlySessionStore

Hi everyone,

I'd like to discuss the following proposal to align IQ Session Store API
with the Window Store one.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-666%3A+Add+Instant-based+methods+to+ReadOnlySessionStore

Looking forward to your feedback.

Cheers,
Jorge.

Re: [DISCUSS] KIP-666: Add Instant-based methods to ReadOnlySessionStore

Posted by Jorge Esteban Quilcate Otoya <qu...@gmail.com>.
Thanks John!

PS. vote thread started
https://lists.apache.org/thread.html/r5863299712063d4cb4be139ce4ab0533ce66efe04c1f7e993666b09c%40%3Cdev.kafka.apache.org%3E

On Sat, Sep 19, 2020 at 2:46 AM John Roesler <vv...@apache.org> wrote:

> Thanks for the KIP, Jorge!
>
> Sorry it took me so long. I just reviewed the KIP, and it looks good to
> me.
>
> Thanks,
> John
>
> On Tue, Sep 1, 2020, at 12:35, Jorge Esteban Quilcate Otoya wrote:
> > Thanks Sophie!
> >
> > > one nit: you missed updating the startTime long to Instant in both
> > appearances of the fetchSession(key, startTime, sessionEndTime) method.
> >
> > Agreed. I'm fixing this on the KIP.
> >
> > > Also, I think by "startTime" you actually meant
> "earliestSessionEndTime".
> >
> > Given the implementation usage of these variables, I think it refers to
> > session start and end time, e.g. in `InMemorySessionStore`:
> >
> > ```
> >     public byte[] fetchSession(final Bytes key, final long startTime,
> final
> > long endTime) {
> >         removeExpiredSegments();
> >
> >         Objects.requireNonNull(key, "key cannot be null");
> >
> >         // Only need to search if the record hasn't expired yet
> >         if (endTime > observedStreamTime - retentionPeriod) {
> >             final ConcurrentNavigableMap<Bytes,
> > ConcurrentNavigableMap<Long, byte[]>> keyMap = endTimeMap.get(endTime);
> >             if (keyMap != null) {
> >                 final ConcurrentNavigableMap<Long, byte[]> startTimeMap =
> > keyMap.get(key);
> >                 if (startTimeMap != null) {
> >                     return startTimeMap.get(startTime);
> >                 }
> >             }
> >         }
> >         return null;
> >     }
> > ```
> >
> > > One question I do have is whether we really need to provide a default
> > implementation
> > > that throws UnsupportedOperationException? Actually I'm wondering if we
> > shouldn't
> > > do something similar to the WindowStore methods, and provide a default
> > implementation
> > > on the SessionStore interface which then just calls the corresponding
> > long-based method.
> >
> > I'll be happy with this approach. I was trying to align it with the
> > approach we followed on KIP-617, but you're right in the case of
> > WindowStore we didn't add default implementations. I'll update the KIP
> > accordingly and wait for more feedback.
> >
> > Cheers,
> > Jorge.
> >
> >
> >
> > On Tue, Sep 1, 2020 at 4:51 AM Sophie Blee-Goldman <so...@confluent.io>
> > wrote:
> >
> > > Thanks for bringing the IQ API into alignment -- the proposal looks
> good,
> > > although
> > > one nit: you missed updating the startTime long to Instant in both
> > > appearances of
> > > the fetchSession(key, startTime, sessionEndTime) method. Also, I think
> by
> > > "startTime"
> > > you actually meant "earliestSessionEndTime".
> > >
> > > One question I do have is whether we really need to provide a default
> > > implementation
> > > that throws UnsupportedOperationException? Actually I'm wondering if we
> > > shouldn't
> > > do something similar to the WindowStore methods, and provide a default
> > > implementation
> > > on the SessionStore interface which then just calls the corresponding
> > > long-based method.
> > > WDYT?
> > >
> > > -Sophie
> > >
> > > On Fri, Aug 28, 2020 at 11:31 AM Jorge Esteban Quilcate Otoya <
> > > quilcate.jorge@gmail.com> wrote:
> > >
> > > > Hi everyone,
> > > >
> > > > I'd like to discuss the following proposal to align IQ Session Store
> API
> > > > with the Window Store one.
> > > >
> > > >
> > > >
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-666%3A+Add+Instant-based+methods+to+ReadOnlySessionStore
> > > >
> > > > Looking forward to your feedback.
> > > >
> > > > Cheers,
> > > > Jorge.
> > > >
> > >
> >
>

Re: [DISCUSS] KIP-666: Add Instant-based methods to ReadOnlySessionStore

Posted by John Roesler <vv...@apache.org>.
Thanks for the KIP, Jorge!

Sorry it took me so long. I just reviewed the KIP, and it looks good to me. 

Thanks,
John

On Tue, Sep 1, 2020, at 12:35, Jorge Esteban Quilcate Otoya wrote:
> Thanks Sophie!
> 
> > one nit: you missed updating the startTime long to Instant in both
> appearances of the fetchSession(key, startTime, sessionEndTime) method.
> 
> Agreed. I'm fixing this on the KIP.
> 
> > Also, I think by "startTime" you actually meant "earliestSessionEndTime".
> 
> Given the implementation usage of these variables, I think it refers to
> session start and end time, e.g. in `InMemorySessionStore`:
> 
> ```
>     public byte[] fetchSession(final Bytes key, final long startTime, final
> long endTime) {
>         removeExpiredSegments();
> 
>         Objects.requireNonNull(key, "key cannot be null");
> 
>         // Only need to search if the record hasn't expired yet
>         if (endTime > observedStreamTime - retentionPeriod) {
>             final ConcurrentNavigableMap<Bytes,
> ConcurrentNavigableMap<Long, byte[]>> keyMap = endTimeMap.get(endTime);
>             if (keyMap != null) {
>                 final ConcurrentNavigableMap<Long, byte[]> startTimeMap =
> keyMap.get(key);
>                 if (startTimeMap != null) {
>                     return startTimeMap.get(startTime);
>                 }
>             }
>         }
>         return null;
>     }
> ```
> 
> > One question I do have is whether we really need to provide a default
> implementation
> > that throws UnsupportedOperationException? Actually I'm wondering if we
> shouldn't
> > do something similar to the WindowStore methods, and provide a default
> implementation
> > on the SessionStore interface which then just calls the corresponding
> long-based method.
> 
> I'll be happy with this approach. I was trying to align it with the
> approach we followed on KIP-617, but you're right in the case of
> WindowStore we didn't add default implementations. I'll update the KIP
> accordingly and wait for more feedback.
> 
> Cheers,
> Jorge.
> 
> 
> 
> On Tue, Sep 1, 2020 at 4:51 AM Sophie Blee-Goldman <so...@confluent.io>
> wrote:
> 
> > Thanks for bringing the IQ API into alignment -- the proposal looks good,
> > although
> > one nit: you missed updating the startTime long to Instant in both
> > appearances of
> > the fetchSession(key, startTime, sessionEndTime) method. Also, I think by
> > "startTime"
> > you actually meant "earliestSessionEndTime".
> >
> > One question I do have is whether we really need to provide a default
> > implementation
> > that throws UnsupportedOperationException? Actually I'm wondering if we
> > shouldn't
> > do something similar to the WindowStore methods, and provide a default
> > implementation
> > on the SessionStore interface which then just calls the corresponding
> > long-based method.
> > WDYT?
> >
> > -Sophie
> >
> > On Fri, Aug 28, 2020 at 11:31 AM Jorge Esteban Quilcate Otoya <
> > quilcate.jorge@gmail.com> wrote:
> >
> > > Hi everyone,
> > >
> > > I'd like to discuss the following proposal to align IQ Session Store API
> > > with the Window Store one.
> > >
> > >
> > >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-666%3A+Add+Instant-based+methods+to+ReadOnlySessionStore
> > >
> > > Looking forward to your feedback.
> > >
> > > Cheers,
> > > Jorge.
> > >
> >
>

Re: [DISCUSS] KIP-666: Add Instant-based methods to ReadOnlySessionStore

Posted by Jorge Esteban Quilcate Otoya <qu...@gmail.com>.
Thanks Sophie!

> one nit: you missed updating the startTime long to Instant in both
appearances of the fetchSession(key, startTime, sessionEndTime) method.

Agreed. I'm fixing this on the KIP.

> Also, I think by "startTime" you actually meant "earliestSessionEndTime".

Given the implementation usage of these variables, I think it refers to
session start and end time, e.g. in `InMemorySessionStore`:

```
    public byte[] fetchSession(final Bytes key, final long startTime, final
long endTime) {
        removeExpiredSegments();

        Objects.requireNonNull(key, "key cannot be null");

        // Only need to search if the record hasn't expired yet
        if (endTime > observedStreamTime - retentionPeriod) {
            final ConcurrentNavigableMap<Bytes,
ConcurrentNavigableMap<Long, byte[]>> keyMap = endTimeMap.get(endTime);
            if (keyMap != null) {
                final ConcurrentNavigableMap<Long, byte[]> startTimeMap =
keyMap.get(key);
                if (startTimeMap != null) {
                    return startTimeMap.get(startTime);
                }
            }
        }
        return null;
    }
```

> One question I do have is whether we really need to provide a default
implementation
> that throws UnsupportedOperationException? Actually I'm wondering if we
shouldn't
> do something similar to the WindowStore methods, and provide a default
implementation
> on the SessionStore interface which then just calls the corresponding
long-based method.

I'll be happy with this approach. I was trying to align it with the
approach we followed on KIP-617, but you're right in the case of
WindowStore we didn't add default implementations. I'll update the KIP
accordingly and wait for more feedback.

Cheers,
Jorge.



On Tue, Sep 1, 2020 at 4:51 AM Sophie Blee-Goldman <so...@confluent.io>
wrote:

> Thanks for bringing the IQ API into alignment -- the proposal looks good,
> although
> one nit: you missed updating the startTime long to Instant in both
> appearances of
> the fetchSession(key, startTime, sessionEndTime) method. Also, I think by
> "startTime"
> you actually meant "earliestSessionEndTime".
>
> One question I do have is whether we really need to provide a default
> implementation
> that throws UnsupportedOperationException? Actually I'm wondering if we
> shouldn't
> do something similar to the WindowStore methods, and provide a default
> implementation
> on the SessionStore interface which then just calls the corresponding
> long-based method.
> WDYT?
>
> -Sophie
>
> On Fri, Aug 28, 2020 at 11:31 AM Jorge Esteban Quilcate Otoya <
> quilcate.jorge@gmail.com> wrote:
>
> > Hi everyone,
> >
> > I'd like to discuss the following proposal to align IQ Session Store API
> > with the Window Store one.
> >
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-666%3A+Add+Instant-based+methods+to+ReadOnlySessionStore
> >
> > Looking forward to your feedback.
> >
> > Cheers,
> > Jorge.
> >
>

Re: [DISCUSS] KIP-666: Add Instant-based methods to ReadOnlySessionStore

Posted by Sophie Blee-Goldman <so...@confluent.io>.
Thanks for bringing the IQ API into alignment -- the proposal looks good,
although
one nit: you missed updating the startTime long to Instant in both
appearances of
the fetchSession(key, startTime, sessionEndTime) method. Also, I think by
"startTime"
you actually meant "earliestSessionEndTime".

One question I do have is whether we really need to provide a default
implementation
that throws UnsupportedOperationException? Actually I'm wondering if we
shouldn't
do something similar to the WindowStore methods, and provide a default
implementation
on the SessionStore interface which then just calls the corresponding
long-based method.
WDYT?

-Sophie

On Fri, Aug 28, 2020 at 11:31 AM Jorge Esteban Quilcate Otoya <
quilcate.jorge@gmail.com> wrote:

> Hi everyone,
>
> I'd like to discuss the following proposal to align IQ Session Store API
> with the Window Store one.
>
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-666%3A+Add+Instant-based+methods+to+ReadOnlySessionStore
>
> Looking forward to your feedback.
>
> Cheers,
> Jorge.
>