You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Bill Bejeck <bb...@gmail.com> on 2017/06/01 21:04:01 UTC

[DISCUSS] KIP-167: Add a restoreAll method to StateRestoreCallback

All,

I'd like to start the discussion for adding bulk add functionality when
restoring a state store.  The KIP can be found here:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-167%3A+Add+a+restoreAll+method+to+StateRestoreCallback

Thanks,
Bill

Re: [DISCUSS] KIP-167: Add a restoreAll method to StateRestoreCallback

Posted by Bill Bejeck <bb...@gmail.com>.
All,

I've updated the KIP and I'd like to start a final round of discussion with
an eye towards starting a vote  soon, maybe on Monday.

Thanks,
Bill

On Mon, Jun 26, 2017 at 7:26 PM, Guozhang Wang <wa...@gmail.com> wrote:

> Hmm... I'm not sure how this can achieve "closing the store before
> restoration, re-open it with a slight different config, and then
> close-and-reopen store for query" pattern? You need to be able to access
> the store object in order to do this right?
>
>
> Guozhang
>
> On Mon, Jun 26, 2017 at 7:40 AM, Bill Bejeck <bb...@gmail.com> wrote:
>
> > Thinking about this some more, I have another approach.  Leave the first
> > parameter of as String in the StateRestoreListener interface.
> >
> > But we'll provide 2 default abstract classes one implementing
> > StateRestoreCallback and the other implementing the
> > BatchingStateRestoreCallback.  Both abstract classes will also implement
> > the StateRestoreListener interface with no-op methods provided for the
> > restore progress methods.
> >
> > WDYT?
> >
> > On Mon, Jun 26, 2017 at 10:13 AM, Bill Bejeck <bb...@gmail.com> wrote:
> >
> > > Guozhang,
> > >
> > > Thanks for the comments.
> > >
> > > I think that will work, but my concern is it might not be as clear to
> > > users that want to receive external notification of the restore
> progress
> > > separately (say for reporting purposes) and still send separate signals
> > to
> > > the state store for resource management tasks.
> > >
> > > However I like this approach better and I have some ideas I can do in
> the
> > > implementation, so I'll update the KIP accordingly.
> > >
> > > Thanks,
> > > Bill
> > >
> > > On Wed, Jun 21, 2017 at 10:14 PM, Guozhang Wang <wa...@gmail.com>
> > > wrote:
> > >
> > >> More specifically, if we can replace the first parameter from the
> String
> > >> store name to the store instance itself, would that be sufficient to
> > >> cover `
> > >> StateRestoreNotification`?
> > >>
> > >> On Wed, Jun 21, 2017 at 7:13 PM, Guozhang Wang <wa...@gmail.com>
> > >> wrote:
> > >>
> > >> > Bill,
> > >> >
> > >> > I'm wondering why we need the `StateRestoreNotification` while still
> > >> > having `StateRestoreListener`, could the above setup achievable just
> > >> with
> > >> > `StateRestoreListener.onRestoreStart / onRestoreEnd`? I.e. it seems
> > the
> > >> > later can subsume any use cases intended for the former API.
> > >> >
> > >> > Guozhang
> > >> >
> > >> > On Mon, Jun 19, 2017 at 3:23 PM, Bill Bejeck <bb...@gmail.com>
> > wrote:
> > >> >
> > >> >> I'm going to update the KIP with new interface
> > StateRestoreNotification
> > >> >> containing two methods, startRestore and endRestore.
> > >> >>
> > >> >> While naming is very similar to methods already proposed on the
> > >> >> StateRestoreListener, the intent of these methods is not for user
> > >> >> notification of restore status.  Instead these new methods are for
> > >> >> internal
> > >> >> use by the state store to perform any required setup and teardown
> > work
> > >> due
> > >> >> to a batch restoration process.
> > >> >>
> > >> >> Here's one current use case: when using RocksDB we should optimize
> > for
> > >> a
> > >> >> bulk load by setting Options.prepareForBulkload().
> > >> >>
> > >> >>    1. If the database has already been opened, we'll need to close
> > it,
> > >> set
> > >> >>    the "prepareForBulkload" and re-open the database.
> > >> >>    2. Once the restore is completed we'll need to close and re-open
> > the
> > >> >>    database with the "prepareForBulkload" option turned off.
> > >> >>
> > >> >> While we are mentioning the RocksDB use case above, the addition of
> > >> this
> > >> >> interface is not specific to any specific implementation of a
> > >> persistent
> > >> >> state store.
> > >> >>
> > >> >> Additionally, a separate interface is needed so that any user can
> > >> >> implement
> > >> >> the state restore notification feature regardless of the state
> > restore
> > >> >> callback used.
> > >> >>
> > >> >> I'll also remove the "getStateRestoreListener" method and stick
> with
> > >> the
> > >> >> notion of a "global" restore listener for now.
> > >> >>
> > >> >> On Mon, Jun 19, 2017 at 1:05 PM, Bill Bejeck <bb...@gmail.com>
> > >> wrote:
> > >> >>
> > >> >> > Yes it is, more of an oversight on my part, I'll remove it from
> the
> > >> KIP.
> > >> >> >
> > >> >> >
> > >> >> > On Mon, Jun 19, 2017 at 12:48 PM, Matthias J. Sax <
> > >> >> matthias@confluent.io>
> > >> >> > wrote:
> > >> >> >
> > >> >> >> Hi,
> > >> >> >>
> > >> >> >> I thinks for now it's good enough to start with a single global
> > >> restore
> > >> >> >> listener. We can incrementally improve this later on if
> required.
> > Of
> > >> >> >> course, if it's easy to do right away we can also be more fine
> > >> grained.
> > >> >> >> But for KTable, we might want to add this after getting rid of
> all
> > >> the
> > >> >> >> overloads we have atm.
> > >> >> >>
> > >> >> >> One question: what is the purpose of parameter "endOffset" in
> > >> >> >> #onRestoreEnd() -- isn't this the same value as provided in
> > >> >> >> #onRestoreStart() ?
> > >> >> >>
> > >> >> >>
> > >> >> >> -Matthias
> > >> >> >>
> > >> >> >>
> > >> >> >>
> > >> >> >> On 6/15/17 6:18 AM, Bill Bejeck wrote:
> > >> >> >> > Thinking about the custom StateRestoreListener approach and
> > >> having a
> > >> >> get
> > >> >> >> > method on the interface will really only work for custom state
> > >> >> stores.
> > >> >> >> >
> > >> >> >> > So we'll need to provide another way for users to set behavior
> > >> with
> > >> >> >> > provided state stores.  The only option that comes to mind now
> > is
> > >> >> also
> > >> >> >> > adding a parameter to the StateStoreSupplier.
> > >> >> >> >
> > >> >> >> >
> > >> >> >> > Bill
> > >> >> >> >
> > >> >> >> >
> > >> >> >> > On Wed, Jun 14, 2017 at 5:39 PM, Bill Bejeck <
> bbejeck@gmail.com
> > >
> > >> >> wrote:
> > >> >> >> >
> > >> >> >> >> Guozhang,
> > >> >> >> >>
> > >> >> >> >> Thanks for the comments.
> > >> >> >> >>
> > >> >> >> >> 1.  As for the granularity, I agree that having one global
> > >> >> >> >> StateRestoreListener could be restrictive.  But I think it's
> > >> >> important
> > >> >> >> to
> > >> >> >> >> have a "setStateRestoreListener" on KafkaStreams as this
> allows
> > >> >> users
> > >> >> >> to
> > >> >> >> >> define an anonymous instance that has access to local scope
> for
> > >> >> >> reporting
> > >> >> >> >> purposes.  This is a similar pattern we use for
> > >> >> >> >> KafkaStreams.setStateListener.
> > >> >> >> >>
> > >> >> >> >> As an alternative, what if we add a method to the
> > >> >> >> BatchingStateRestoreCallback
> > >> >> >> >> interface named "getStateStoreListener".   Then in an
> abstract
> > >> >> adapter
> > >> >> >> >> class we return null from getStateStoreListener.   But if
> users
> > >> >> want to
> > >> >> >> >> supply a different StateRestoreListener strategy per callback
> > >> they
> > >> >> >> would
> > >> >> >> >> simply override the method to return an actual instance.
> > >> >> >> >>
> > >> >> >> >> WDYT?
> > >> >> >> >>
> > >> >> >> >> 2.  I'll make the required updates to pass in the ending
> offset
> > >> at
> > >> >> the
> > >> >> >> >> start as well as the actual name of the state store.
> > >> >> >> >>
> > >> >> >> >> Bill
> > >> >> >> >>
> > >> >> >> >>
> > >> >> >> >> On Wed, Jun 14, 2017 at 3:53 PM, Guozhang Wang <
> > >> wangguoz@gmail.com>
> > >> >> >> wrote:
> > >> >> >> >>
> > >> >> >> >>> Thanks Bill for the updated wiki. I have a couple of more
> > >> comments:
> > >> >> >> >>>
> > >> >> >> >>> 1. Setting StateRestoreListener on the KafkaStreams
> > granularity
> > >> may
> > >> >> >> not be
> > >> >> >> >>> sufficient, as in the listener callback we do not which
> store
> > >> it is
> > >> >> >> >>> restoring right now: if the topic is a changelog topic then
> > from
> > >> >> the
> > >> >> >> >>> `TopicPartition` we may be able to infer the state store
> name,
> > >> but
> > >> >> if
> > >> >> >> the
> > >> >> >> >>> topic is the source topic read as a KTable then we may not
> > know
> > >> >> which
> > >> >> >> >>> store
> > >> >> >> >>> it is restoring right now; plus forcing users to infer the
> > state
> > >> >> store
> > >> >> >> >>> name
> > >> >> >> >>> from the topic partition name would not be intuitive as
> well.
> > >> Plus
> > >> >> for
> > >> >> >> >>> different stores the listener may be implemented
> differently,
> > >> and
> > >> >> >> setting
> > >> >> >> >>> a
> > >> >> >> >>> global listener would force users to branch on the
> > >> topic-partition
> > >> >> >> names,
> > >> >> >> >>> similarly to what we did in the global timestamp extractor.
> On
> > >> the
> > >> >> >> other
> > >> >> >> >>> hand, I also agree that setting the listener on the
> per-store
> > >> >> >> granularity
> > >> >> >> >>> may be a bit cumbersome since if users want to override it
> on
> > a
> > >> >> >> specific
> > >> >> >> >>> store it needs to expose some APIs maybe at
> > StateStoreSupplier.
> > >> So
> > >> >> >> would
> > >> >> >> >>> love to hear other people's opinions.
> > >> >> >> >>>
> > >> >> >> >>> If we think that different implemented restoring callback
> may
> > be
> > >> >> less
> > >> >> >> >>> common, then I'd suggest at least replace the
> `TopicPartition`
> > >> >> >> parameter
> > >> >> >> >>> with the `String` store name and the `TaskId`?
> > >> >> >> >>>
> > >> >> >> >>> 2. I think we can pass in the `long endOffset` in the
> > >> >> `onRestoreStart`
> > >> >> >> >>> function as well, as we will have read the endOffset already
> > by
> > >> >> then;
> > >> >> >> >>> otherwise users can still not be able to track the
> restoration
> > >> >> >> progress
> > >> >> >> >>> (e.g. how much percentage I have been restoring so far, to
> > >> estimate
> > >> >> >> on how
> > >> >> >> >>> long I still need to wait).
> > >> >> >> >>>
> > >> >> >> >>>
> > >> >> >> >>> Guozhang
> > >> >> >> >>>
> > >> >> >> >>>
> > >> >> >> >>>
> > >> >> >> >>> On Wed, Jun 14, 2017 at 12:25 PM, Bill Bejeck <
> > >> bbejeck@gmail.com>
> > >> >> >> wrote:
> > >> >> >> >>>
> > >> >> >> >>>> Eno,
> > >> >> >> >>>>
> > >> >> >> >>>> Thanks for the comments.
> > >> >> >> >>>>
> > >> >> >> >>>> 1. As for having both restore and restoreAll, I kept the
> > >> restore
> > >> >> >> method
> > >> >> >> >>> for
> > >> >> >> >>>> backward compatibility as that is what is used by current
> > >> >> >> implementing
> > >> >> >> >>>> classes. However as I think about it makes things cleaner
> to
> > >> have
> > >> >> a
> > >> >> >> >>> single
> > >> >> >> >>>> restore method taking a collection. I'll wait for others to
> > >> weigh
> > >> >> in,
> > >> >> >> >>> but
> > >> >> >> >>>> I'm leaning towards having a single restore method.
> > >> >> >> >>>>
> > >> >> >> >>>> 2. The "onBatchRestored" method is for keeping track of the
> > >> >> restore
> > >> >> >> >>> process
> > >> >> >> >>>> as we load records from each poll request.
> > >> >> >> >>>>
> > >> >> >> >>>>    For example if the change log contained 5000 records and
> > >> >> >> >>>> MAX_POLL_RECORDS is set to 1000, the "onBatchRestored"
> method
> > >> >> would
> > >> >> >> get
> > >> >> >> >>>> called 5 times each time with the ending offset of the last
> > >> >> record in
> > >> >> >> >>> the
> > >> >> >> >>>> batch and the count    of the batch.   I'll update the KIP
> to
> > >> add
> > >> >> >> >>> comments
> > >> >> >> >>>> above the interface methods.
> > >> >> >> >>>>
> > >> >> >> >>>>
> > >> >> >> >>>> Thanks,
> > >> >> >> >>>> Bill
> > >> >> >> >>>>
> > >> >> >> >>>>
> > >> >> >> >>>> On Wed, Jun 14, 2017 at 11:49 AM, Eno Thereska <
> > >> >> >> eno.thereska@gmail.com>
> > >> >> >> >>>> wrote:
> > >> >> >> >>>>
> > >> >> >> >>>>> Thanks Bill,
> > >> >> >> >>>>>
> > >> >> >> >>>>> A couple of questions:
> > >> >> >> >>>>>
> > >> >> >> >>>>>
> > >> >> >> >>>> 1. why do we need both restore and restoreAll, why can't we
> > >> just
> > >> >> have
> > >> >> >> >>> one,
> > >> >> >> >>>>> that takes a collection (i.e., restore all)? Are there
> cases
> > >> when
> > >> >> >> >>> people
> > >> >> >> >>>>> want to restore one at a time? In that case, they could
> > still
> > >> use
> > >> >> >> >>>>> restoreAll with just 1 record in the collection right?
> > >> >> >> >>>>>
> > >> >> >> >>>>> 2. I don't quite get "onBatchRestored". Could you put a
> > small
> > >> >> >> comment
> > >> >> >> >>> on
> > >> >> >> >>>>> top of all three methods. An example might help here.
> > >> >> >> >>>>>
> > >> >> >> >>>>> Thanks
> > >> >> >> >>>>> Eno
> > >> >> >> >>>>>
> > >> >> >> >>>>>
> > >> >> >> >>>>>> On 8 Jun 2017, at 18:05, Bill Bejeck <bb...@gmail.com>
> > >> wrote:
> > >> >> >> >>>>>>
> > >> >> >> >>>>>> Guozhang, Damian thanks for the comments.
> > >> >> >> >>>>>>
> > >> >> >> >>>>>> Giving developers the ability to hook into StateStore
> > >> recovery
> > >> >> >> >>> phases
> > >> >> >> >>>> was
> > >> >> >> >>>>>> part of my original intent. However the state the KIP is
> in
> > >> now
> > >> >> >> >>> won't
> > >> >> >> >>>>>> provide this functionality.
> > >> >> >> >>>>>>
> > >> >> >> >>>>>> As a result I'll be doing a significant revision of
> > KIP-167.
> > >> >> I'll
> > >> >> >> >>> be
> > >> >> >> >>>>> sure
> > >> >> >> >>>>>> to incorporate all your comments in the new revision.
> > >> >> >> >>>>>>
> > >> >> >> >>>>>> Thanks,
> > >> >> >> >>>>>> Bill
> > >> >> >> >>>>>>
> > >> >> >> >>>>>> On Wed, Jun 7, 2017 at 5:24 AM, Damian Guy <
> > >> >> damian.guy@gmail.com>
> > >> >> >> >>>> wrote:
> > >> >> >> >>>>>>
> > >> >> >> >>>>>>> I'm largely in agreement with what Guozhang has
> suggested,
> > >> >> i.e.,
> > >> >> >> >>>>>>> StateRestoreContext shouldn't have any setters on it and
> > >> also
> > >> >> need
> > >> >> >> >>> to
> > >> >> >> >>>>> have
> > >> >> >> >>>>>>> the end offset available such that people can use it
> > derive
> > >> >> >> >>> progress.
> > >> >> >> >>>>>>> Slightly different, maybe the StateRestoreContext
> > interface
> > >> >> could
> > >> >> >> >>> be:
> > >> >> >> >>>>>>>
> > >> >> >> >>>>>>> long beginOffset()
> > >> >> >> >>>>>>> long endOffset()
> > >> >> >> >>>>>>> long currentOffset()
> > >> >> >> >>>>>>>
> > >> >> >> >>>>>>> One further thing, this currently doesn't provide
> > developers
> > >> >> the
> > >> >> >> >>>>> ability to
> > >> >> >> >>>>>>> hook into this information if they are using the
> built-in
> > >> >> >> >>> StateStores.
> > >> >> >> >>>>> Is
> > >> >> >> >>>>>>> this something we should be considering?
> > >> >> >> >>>>>>>
> > >> >> >> >>>>>>>
> > >> >> >> >>>>>>> On Tue, 6 Jun 2017 at 23:32 Guozhang Wang <
> > >> wangguoz@gmail.com>
> > >> >> >> >>> wrote:
> > >> >> >> >>>>>>>
> > >> >> >> >>>>>>>> Thanks for the updated KIP Bill, I have a couple of
> > >> comments:
> > >> >> >> >>>>>>>>
> > >> >> >> >>>>>>>> 1) I'm assuming beginRestore / endRestore is called
> only
> > >> once
> > >> >> per
> > >> >> >> >>>> store
> > >> >> >> >>>>>>>> throughout the whole restoration process, and
> restoreAll
> > is
> > >> >> >> called
> > >> >> >> >>>> per
> > >> >> >> >>>>>>>> batch. In that case I feel we can set the
> > >> StateRestoreContext
> > >> >> as
> > >> >> >> a
> > >> >> >> >>>>> second
> > >> >> >> >>>>>>>> parameter in restoreAll and in endRestore as well, and
> > let
> > >> the
> > >> >> >> >>>> library
> > >> >> >> >>>>> to
> > >> >> >> >>>>>>>> set the corresponding values instead and only let users
> > to
> > >> >> read
> > >> >> >> >>>> (since
> > >> >> >> >>>>>>> the
> > >> >> >> >>>>>>>> collection of key-value pairs do not contain offset
> > >> >> information
> > >> >> >> >>>> anyways
> > >> >> >> >>>>>>>> users cannot really set the offset). The
> > >> "lastOffsetRestored"
> > >> >> >> >>> would
> > >> >> >> >>>> be
> > >> >> >> >>>>>>> the
> > >> >> >> >>>>>>>> starting offset when called on `beginRestore`.
> > >> >> >> >>>>>>>>
> > >> >> >> >>>>>>>> 2) Users who wants to implement their own batch
> > restoration
> > >> >> >> >>> callbacks
> > >> >> >> >>>>>>> would
> > >> >> >> >>>>>>>> now need to implement both `restore` and `restoreAll`
> > while
> > >> >> they
> > >> >> >> >>>> either
> > >> >> >> >>>>>>> let
> > >> >> >> >>>>>>>> `restoreAll` to call `restore` or implement the logic
> in
> > >> >> >> >>> `restoreAll`
> > >> >> >> >>>>>>> only
> > >> >> >> >>>>>>>> and never call `restore`. Maybe we can provide two
> > abstract
> > >> >> impl
> > >> >> >> >>> of
> > >> >> >> >>>>>>>> BatchingStateRestoreCallbacks which does beginRestore /
> > >> >> >> >>> endRestore as
> > >> >> >> >>>>>>>> no-ops, with one callback implementing `restoreAll` to
> > call
> > >> >> >> >>> abstract
> > >> >> >> >>>>>>>> `restore` while the other implement `restore` to throw
> > "not
> > >> >> >> >>> supported
> > >> >> >> >>>>>>>> exception" and keep `restoreAll` abstract.
> > >> >> >> >>>>>>>>
> > >> >> >> >>>>>>>> 3) I think we can also return the "offset limit" in
> > >> >> >> >>>>> StateRestoreContext,
> > >> >> >> >>>>>>>> which is important for users to track the restoration
> > >> progress
> > >> >> >> >>> since
> > >> >> >> >>>>>>>> otherwise they could not tell how many percent of
> > >> restoration
> > >> >> has
> > >> >> >> >>>>>>>> completed.  I.e.:
> > >> >> >> >>>>>>>>
> > >> >> >> >>>>>>>> public interface BatchingStateRestoreCallback extends
> > >> >> >> >>>>>>> StateRestoreCallback
> > >> >> >> >>>>>>>> {
> > >> >> >> >>>>>>>>
> > >> >> >> >>>>>>>>   void restoreAll(Collection<KeyValue<byte[], byte
> []>>
> > >> >> records,
> > >> >> >> >>>>>>>> StateRestoreContext
> > >> >> >> >>>>>>>> restoreContext);
> > >> >> >> >>>>>>>>
> > >> >> >> >>>>>>>>   void beginRestore(StateRestoreContext
> restoreContext);
> > >> >> >> >>>>>>>>
> > >> >> >> >>>>>>>>   void endRestore(StateRestoreContext restoreContext);
> > >> >> >> >>>>>>>> }
> > >> >> >> >>>>>>>>
> > >> >> >> >>>>>>>> public interface StateRestoreContext {
> > >> >> >> >>>>>>>>
> > >> >> >> >>>>>>>>  long lastOffsetRestored();
> > >> >> >> >>>>>>>>
> > >> >> >> >>>>>>>>  long endOffsetToRestore();
> > >> >> >> >>>>>>>>
> > >> >> >> >>>>>>>>  int numberRestored();
> > >> >> >> >>>>>>>> }
> > >> >> >> >>>>>>>>
> > >> >> >> >>>>>>>>
> > >> >> >> >>>>>>>> Guozhang
> > >> >> >> >>>>>>>>
> > >> >> >> >>>>>>>>
> > >> >> >> >>>>>>>>
> > >> >> >> >>>>>>>> On Fri, Jun 2, 2017 at 9:16 AM, Bill Bejeck <
> > >> >> bbejeck@gmail.com>
> > >> >> >> >>>> wrote:
> > >> >> >> >>>>>>>>
> > >> >> >> >>>>>>>>> Guozhang, Matthias,
> > >> >> >> >>>>>>>>>
> > >> >> >> >>>>>>>>> Thanks for the comments.  I have updated the KIP,
> (JIRA
> > >> title
> > >> >> >> and
> > >> >> >> >>>>>>>>> description as well).
> > >> >> >> >>>>>>>>>
> > >> >> >> >>>>>>>>> I had thought about introducing a separate interface
> > >> >> altogether,
> > >> >> >> >>> but
> > >> >> >> >>>>>>>>> extending the current one makes more sense.
> > >> >> >> >>>>>>>>>
> > >> >> >> >>>>>>>>> As for intermediate callbacks based on time or number
> of
> > >> >> >> >>> records, I
> > >> >> >> >>>>>>> think
> > >> >> >> >>>>>>>>> the latest update to the KIP addresses this point of
> > >> querying
> > >> >> >> for
> > >> >> >> >>>>>>>>> intermediate results, but it would be per batch
> > restored.
> > >> >> >> >>>>>>>>>
> > >> >> >> >>>>>>>>> Thanks,
> > >> >> >> >>>>>>>>> Bill
> > >> >> >> >>>>>>>>>
> > >> >> >> >>>>>>>>>
> > >> >> >> >>>>>>>>>
> > >> >> >> >>>>>>>>>
> > >> >> >> >>>>>>>>>
> > >> >> >> >>>>>>>>> On Fri, Jun 2, 2017 at 8:36 AM, Jim Jagielski <
> > >> >> jim@jagunet.com>
> > >> >> >> >>>>> wrote:
> > >> >> >> >>>>>>>>>
> > >> >> >> >>>>>>>>>>
> > >> >> >> >>>>>>>>>>> On Jun 2, 2017, at 12:54 AM, Matthias J. Sax <
> > >> >> >> >>>>>>> matthias@confluent.io>
> > >> >> >> >>>>>>>>>> wrote:
> > >> >> >> >>>>>>>>>>>
> > >> >> >> >>>>>>>>>>> With regard to backward compatibility, we should not
> > >> change
> > >> >> >> the
> > >> >> >> >>>>>>>> current
> > >> >> >> >>>>>>>>>>> interface, but add a new interface that extends the
> > >> current
> > >> >> >> >>> one.
> > >> >> >> >>>>>>>>>>>
> > >> >> >> >>>>>>>>>>
> > >> >> >> >>>>>>>>>> ++1
> > >> >> >> >>>>>>>>>>
> > >> >> >> >>>>>>>>>>
> > >> >> >> >>>>>>>>>
> > >> >> >> >>>>>>>>
> > >> >> >> >>>>>>>>
> > >> >> >> >>>>>>>>
> > >> >> >> >>>>>>>> --
> > >> >> >> >>>>>>>> -- Guozhang
> > >> >> >> >>>>>>>>
> > >> >> >> >>>>>>>
> > >> >> >> >>>>>
> > >> >> >> >>>>>
> > >> >> >> >>>>
> > >> >> >> >>>
> > >> >> >> >>>
> > >> >> >> >>>
> > >> >> >> >>> --
> > >> >> >> >>> -- Guozhang
> > >> >> >> >>>
> > >> >> >> >>
> > >> >> >> >>
> > >> >> >> >
> > >> >> >>
> > >> >> >>
> > >> >> >
> > >> >>
> > >> >
> > >> >
> > >> >
> > >> > --
> > >> > -- Guozhang
> > >> >
> > >>
> > >>
> > >>
> > >> --
> > >> -- Guozhang
> > >>
> > >
> > >
> >
>
>
>
> --
> -- Guozhang
>

Re: [DISCUSS] KIP-167: Add a restoreAll method to StateRestoreCallback

Posted by Guozhang Wang <wa...@gmail.com>.
Hmm... I'm not sure how this can achieve "closing the store before
restoration, re-open it with a slight different config, and then
close-and-reopen store for query" pattern? You need to be able to access
the store object in order to do this right?


Guozhang

On Mon, Jun 26, 2017 at 7:40 AM, Bill Bejeck <bb...@gmail.com> wrote:

> Thinking about this some more, I have another approach.  Leave the first
> parameter of as String in the StateRestoreListener interface.
>
> But we'll provide 2 default abstract classes one implementing
> StateRestoreCallback and the other implementing the
> BatchingStateRestoreCallback.  Both abstract classes will also implement
> the StateRestoreListener interface with no-op methods provided for the
> restore progress methods.
>
> WDYT?
>
> On Mon, Jun 26, 2017 at 10:13 AM, Bill Bejeck <bb...@gmail.com> wrote:
>
> > Guozhang,
> >
> > Thanks for the comments.
> >
> > I think that will work, but my concern is it might not be as clear to
> > users that want to receive external notification of the restore progress
> > separately (say for reporting purposes) and still send separate signals
> to
> > the state store for resource management tasks.
> >
> > However I like this approach better and I have some ideas I can do in the
> > implementation, so I'll update the KIP accordingly.
> >
> > Thanks,
> > Bill
> >
> > On Wed, Jun 21, 2017 at 10:14 PM, Guozhang Wang <wa...@gmail.com>
> > wrote:
> >
> >> More specifically, if we can replace the first parameter from the String
> >> store name to the store instance itself, would that be sufficient to
> >> cover `
> >> StateRestoreNotification`?
> >>
> >> On Wed, Jun 21, 2017 at 7:13 PM, Guozhang Wang <wa...@gmail.com>
> >> wrote:
> >>
> >> > Bill,
> >> >
> >> > I'm wondering why we need the `StateRestoreNotification` while still
> >> > having `StateRestoreListener`, could the above setup achievable just
> >> with
> >> > `StateRestoreListener.onRestoreStart / onRestoreEnd`? I.e. it seems
> the
> >> > later can subsume any use cases intended for the former API.
> >> >
> >> > Guozhang
> >> >
> >> > On Mon, Jun 19, 2017 at 3:23 PM, Bill Bejeck <bb...@gmail.com>
> wrote:
> >> >
> >> >> I'm going to update the KIP with new interface
> StateRestoreNotification
> >> >> containing two methods, startRestore and endRestore.
> >> >>
> >> >> While naming is very similar to methods already proposed on the
> >> >> StateRestoreListener, the intent of these methods is not for user
> >> >> notification of restore status.  Instead these new methods are for
> >> >> internal
> >> >> use by the state store to perform any required setup and teardown
> work
> >> due
> >> >> to a batch restoration process.
> >> >>
> >> >> Here's one current use case: when using RocksDB we should optimize
> for
> >> a
> >> >> bulk load by setting Options.prepareForBulkload().
> >> >>
> >> >>    1. If the database has already been opened, we'll need to close
> it,
> >> set
> >> >>    the "prepareForBulkload" and re-open the database.
> >> >>    2. Once the restore is completed we'll need to close and re-open
> the
> >> >>    database with the "prepareForBulkload" option turned off.
> >> >>
> >> >> While we are mentioning the RocksDB use case above, the addition of
> >> this
> >> >> interface is not specific to any specific implementation of a
> >> persistent
> >> >> state store.
> >> >>
> >> >> Additionally, a separate interface is needed so that any user can
> >> >> implement
> >> >> the state restore notification feature regardless of the state
> restore
> >> >> callback used.
> >> >>
> >> >> I'll also remove the "getStateRestoreListener" method and stick with
> >> the
> >> >> notion of a "global" restore listener for now.
> >> >>
> >> >> On Mon, Jun 19, 2017 at 1:05 PM, Bill Bejeck <bb...@gmail.com>
> >> wrote:
> >> >>
> >> >> > Yes it is, more of an oversight on my part, I'll remove it from the
> >> KIP.
> >> >> >
> >> >> >
> >> >> > On Mon, Jun 19, 2017 at 12:48 PM, Matthias J. Sax <
> >> >> matthias@confluent.io>
> >> >> > wrote:
> >> >> >
> >> >> >> Hi,
> >> >> >>
> >> >> >> I thinks for now it's good enough to start with a single global
> >> restore
> >> >> >> listener. We can incrementally improve this later on if required.
> Of
> >> >> >> course, if it's easy to do right away we can also be more fine
> >> grained.
> >> >> >> But for KTable, we might want to add this after getting rid of all
> >> the
> >> >> >> overloads we have atm.
> >> >> >>
> >> >> >> One question: what is the purpose of parameter "endOffset" in
> >> >> >> #onRestoreEnd() -- isn't this the same value as provided in
> >> >> >> #onRestoreStart() ?
> >> >> >>
> >> >> >>
> >> >> >> -Matthias
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >> On 6/15/17 6:18 AM, Bill Bejeck wrote:
> >> >> >> > Thinking about the custom StateRestoreListener approach and
> >> having a
> >> >> get
> >> >> >> > method on the interface will really only work for custom state
> >> >> stores.
> >> >> >> >
> >> >> >> > So we'll need to provide another way for users to set behavior
> >> with
> >> >> >> > provided state stores.  The only option that comes to mind now
> is
> >> >> also
> >> >> >> > adding a parameter to the StateStoreSupplier.
> >> >> >> >
> >> >> >> >
> >> >> >> > Bill
> >> >> >> >
> >> >> >> >
> >> >> >> > On Wed, Jun 14, 2017 at 5:39 PM, Bill Bejeck <bbejeck@gmail.com
> >
> >> >> wrote:
> >> >> >> >
> >> >> >> >> Guozhang,
> >> >> >> >>
> >> >> >> >> Thanks for the comments.
> >> >> >> >>
> >> >> >> >> 1.  As for the granularity, I agree that having one global
> >> >> >> >> StateRestoreListener could be restrictive.  But I think it's
> >> >> important
> >> >> >> to
> >> >> >> >> have a "setStateRestoreListener" on KafkaStreams as this allows
> >> >> users
> >> >> >> to
> >> >> >> >> define an anonymous instance that has access to local scope for
> >> >> >> reporting
> >> >> >> >> purposes.  This is a similar pattern we use for
> >> >> >> >> KafkaStreams.setStateListener.
> >> >> >> >>
> >> >> >> >> As an alternative, what if we add a method to the
> >> >> >> BatchingStateRestoreCallback
> >> >> >> >> interface named "getStateStoreListener".   Then in an abstract
> >> >> adapter
> >> >> >> >> class we return null from getStateStoreListener.   But if users
> >> >> want to
> >> >> >> >> supply a different StateRestoreListener strategy per callback
> >> they
> >> >> >> would
> >> >> >> >> simply override the method to return an actual instance.
> >> >> >> >>
> >> >> >> >> WDYT?
> >> >> >> >>
> >> >> >> >> 2.  I'll make the required updates to pass in the ending offset
> >> at
> >> >> the
> >> >> >> >> start as well as the actual name of the state store.
> >> >> >> >>
> >> >> >> >> Bill
> >> >> >> >>
> >> >> >> >>
> >> >> >> >> On Wed, Jun 14, 2017 at 3:53 PM, Guozhang Wang <
> >> wangguoz@gmail.com>
> >> >> >> wrote:
> >> >> >> >>
> >> >> >> >>> Thanks Bill for the updated wiki. I have a couple of more
> >> comments:
> >> >> >> >>>
> >> >> >> >>> 1. Setting StateRestoreListener on the KafkaStreams
> granularity
> >> may
> >> >> >> not be
> >> >> >> >>> sufficient, as in the listener callback we do not which store
> >> it is
> >> >> >> >>> restoring right now: if the topic is a changelog topic then
> from
> >> >> the
> >> >> >> >>> `TopicPartition` we may be able to infer the state store name,
> >> but
> >> >> if
> >> >> >> the
> >> >> >> >>> topic is the source topic read as a KTable then we may not
> know
> >> >> which
> >> >> >> >>> store
> >> >> >> >>> it is restoring right now; plus forcing users to infer the
> state
> >> >> store
> >> >> >> >>> name
> >> >> >> >>> from the topic partition name would not be intuitive as well.
> >> Plus
> >> >> for
> >> >> >> >>> different stores the listener may be implemented differently,
> >> and
> >> >> >> setting
> >> >> >> >>> a
> >> >> >> >>> global listener would force users to branch on the
> >> topic-partition
> >> >> >> names,
> >> >> >> >>> similarly to what we did in the global timestamp extractor. On
> >> the
> >> >> >> other
> >> >> >> >>> hand, I also agree that setting the listener on the per-store
> >> >> >> granularity
> >> >> >> >>> may be a bit cumbersome since if users want to override it on
> a
> >> >> >> specific
> >> >> >> >>> store it needs to expose some APIs maybe at
> StateStoreSupplier.
> >> So
> >> >> >> would
> >> >> >> >>> love to hear other people's opinions.
> >> >> >> >>>
> >> >> >> >>> If we think that different implemented restoring callback may
> be
> >> >> less
> >> >> >> >>> common, then I'd suggest at least replace the `TopicPartition`
> >> >> >> parameter
> >> >> >> >>> with the `String` store name and the `TaskId`?
> >> >> >> >>>
> >> >> >> >>> 2. I think we can pass in the `long endOffset` in the
> >> >> `onRestoreStart`
> >> >> >> >>> function as well, as we will have read the endOffset already
> by
> >> >> then;
> >> >> >> >>> otherwise users can still not be able to track the restoration
> >> >> >> progress
> >> >> >> >>> (e.g. how much percentage I have been restoring so far, to
> >> estimate
> >> >> >> on how
> >> >> >> >>> long I still need to wait).
> >> >> >> >>>
> >> >> >> >>>
> >> >> >> >>> Guozhang
> >> >> >> >>>
> >> >> >> >>>
> >> >> >> >>>
> >> >> >> >>> On Wed, Jun 14, 2017 at 12:25 PM, Bill Bejeck <
> >> bbejeck@gmail.com>
> >> >> >> wrote:
> >> >> >> >>>
> >> >> >> >>>> Eno,
> >> >> >> >>>>
> >> >> >> >>>> Thanks for the comments.
> >> >> >> >>>>
> >> >> >> >>>> 1. As for having both restore and restoreAll, I kept the
> >> restore
> >> >> >> method
> >> >> >> >>> for
> >> >> >> >>>> backward compatibility as that is what is used by current
> >> >> >> implementing
> >> >> >> >>>> classes. However as I think about it makes things cleaner to
> >> have
> >> >> a
> >> >> >> >>> single
> >> >> >> >>>> restore method taking a collection. I'll wait for others to
> >> weigh
> >> >> in,
> >> >> >> >>> but
> >> >> >> >>>> I'm leaning towards having a single restore method.
> >> >> >> >>>>
> >> >> >> >>>> 2. The "onBatchRestored" method is for keeping track of the
> >> >> restore
> >> >> >> >>> process
> >> >> >> >>>> as we load records from each poll request.
> >> >> >> >>>>
> >> >> >> >>>>    For example if the change log contained 5000 records and
> >> >> >> >>>> MAX_POLL_RECORDS is set to 1000, the "onBatchRestored" method
> >> >> would
> >> >> >> get
> >> >> >> >>>> called 5 times each time with the ending offset of the last
> >> >> record in
> >> >> >> >>> the
> >> >> >> >>>> batch and the count    of the batch.   I'll update the KIP to
> >> add
> >> >> >> >>> comments
> >> >> >> >>>> above the interface methods.
> >> >> >> >>>>
> >> >> >> >>>>
> >> >> >> >>>> Thanks,
> >> >> >> >>>> Bill
> >> >> >> >>>>
> >> >> >> >>>>
> >> >> >> >>>> On Wed, Jun 14, 2017 at 11:49 AM, Eno Thereska <
> >> >> >> eno.thereska@gmail.com>
> >> >> >> >>>> wrote:
> >> >> >> >>>>
> >> >> >> >>>>> Thanks Bill,
> >> >> >> >>>>>
> >> >> >> >>>>> A couple of questions:
> >> >> >> >>>>>
> >> >> >> >>>>>
> >> >> >> >>>> 1. why do we need both restore and restoreAll, why can't we
> >> just
> >> >> have
> >> >> >> >>> one,
> >> >> >> >>>>> that takes a collection (i.e., restore all)? Are there cases
> >> when
> >> >> >> >>> people
> >> >> >> >>>>> want to restore one at a time? In that case, they could
> still
> >> use
> >> >> >> >>>>> restoreAll with just 1 record in the collection right?
> >> >> >> >>>>>
> >> >> >> >>>>> 2. I don't quite get "onBatchRestored". Could you put a
> small
> >> >> >> comment
> >> >> >> >>> on
> >> >> >> >>>>> top of all three methods. An example might help here.
> >> >> >> >>>>>
> >> >> >> >>>>> Thanks
> >> >> >> >>>>> Eno
> >> >> >> >>>>>
> >> >> >> >>>>>
> >> >> >> >>>>>> On 8 Jun 2017, at 18:05, Bill Bejeck <bb...@gmail.com>
> >> wrote:
> >> >> >> >>>>>>
> >> >> >> >>>>>> Guozhang, Damian thanks for the comments.
> >> >> >> >>>>>>
> >> >> >> >>>>>> Giving developers the ability to hook into StateStore
> >> recovery
> >> >> >> >>> phases
> >> >> >> >>>> was
> >> >> >> >>>>>> part of my original intent. However the state the KIP is in
> >> now
> >> >> >> >>> won't
> >> >> >> >>>>>> provide this functionality.
> >> >> >> >>>>>>
> >> >> >> >>>>>> As a result I'll be doing a significant revision of
> KIP-167.
> >> >> I'll
> >> >> >> >>> be
> >> >> >> >>>>> sure
> >> >> >> >>>>>> to incorporate all your comments in the new revision.
> >> >> >> >>>>>>
> >> >> >> >>>>>> Thanks,
> >> >> >> >>>>>> Bill
> >> >> >> >>>>>>
> >> >> >> >>>>>> On Wed, Jun 7, 2017 at 5:24 AM, Damian Guy <
> >> >> damian.guy@gmail.com>
> >> >> >> >>>> wrote:
> >> >> >> >>>>>>
> >> >> >> >>>>>>> I'm largely in agreement with what Guozhang has suggested,
> >> >> i.e.,
> >> >> >> >>>>>>> StateRestoreContext shouldn't have any setters on it and
> >> also
> >> >> need
> >> >> >> >>> to
> >> >> >> >>>>> have
> >> >> >> >>>>>>> the end offset available such that people can use it
> derive
> >> >> >> >>> progress.
> >> >> >> >>>>>>> Slightly different, maybe the StateRestoreContext
> interface
> >> >> could
> >> >> >> >>> be:
> >> >> >> >>>>>>>
> >> >> >> >>>>>>> long beginOffset()
> >> >> >> >>>>>>> long endOffset()
> >> >> >> >>>>>>> long currentOffset()
> >> >> >> >>>>>>>
> >> >> >> >>>>>>> One further thing, this currently doesn't provide
> developers
> >> >> the
> >> >> >> >>>>> ability to
> >> >> >> >>>>>>> hook into this information if they are using the built-in
> >> >> >> >>> StateStores.
> >> >> >> >>>>> Is
> >> >> >> >>>>>>> this something we should be considering?
> >> >> >> >>>>>>>
> >> >> >> >>>>>>>
> >> >> >> >>>>>>> On Tue, 6 Jun 2017 at 23:32 Guozhang Wang <
> >> wangguoz@gmail.com>
> >> >> >> >>> wrote:
> >> >> >> >>>>>>>
> >> >> >> >>>>>>>> Thanks for the updated KIP Bill, I have a couple of
> >> comments:
> >> >> >> >>>>>>>>
> >> >> >> >>>>>>>> 1) I'm assuming beginRestore / endRestore is called only
> >> once
> >> >> per
> >> >> >> >>>> store
> >> >> >> >>>>>>>> throughout the whole restoration process, and restoreAll
> is
> >> >> >> called
> >> >> >> >>>> per
> >> >> >> >>>>>>>> batch. In that case I feel we can set the
> >> StateRestoreContext
> >> >> as
> >> >> >> a
> >> >> >> >>>>> second
> >> >> >> >>>>>>>> parameter in restoreAll and in endRestore as well, and
> let
> >> the
> >> >> >> >>>> library
> >> >> >> >>>>> to
> >> >> >> >>>>>>>> set the corresponding values instead and only let users
> to
> >> >> read
> >> >> >> >>>> (since
> >> >> >> >>>>>>> the
> >> >> >> >>>>>>>> collection of key-value pairs do not contain offset
> >> >> information
> >> >> >> >>>> anyways
> >> >> >> >>>>>>>> users cannot really set the offset). The
> >> "lastOffsetRestored"
> >> >> >> >>> would
> >> >> >> >>>> be
> >> >> >> >>>>>>> the
> >> >> >> >>>>>>>> starting offset when called on `beginRestore`.
> >> >> >> >>>>>>>>
> >> >> >> >>>>>>>> 2) Users who wants to implement their own batch
> restoration
> >> >> >> >>> callbacks
> >> >> >> >>>>>>> would
> >> >> >> >>>>>>>> now need to implement both `restore` and `restoreAll`
> while
> >> >> they
> >> >> >> >>>> either
> >> >> >> >>>>>>> let
> >> >> >> >>>>>>>> `restoreAll` to call `restore` or implement the logic in
> >> >> >> >>> `restoreAll`
> >> >> >> >>>>>>> only
> >> >> >> >>>>>>>> and never call `restore`. Maybe we can provide two
> abstract
> >> >> impl
> >> >> >> >>> of
> >> >> >> >>>>>>>> BatchingStateRestoreCallbacks which does beginRestore /
> >> >> >> >>> endRestore as
> >> >> >> >>>>>>>> no-ops, with one callback implementing `restoreAll` to
> call
> >> >> >> >>> abstract
> >> >> >> >>>>>>>> `restore` while the other implement `restore` to throw
> "not
> >> >> >> >>> supported
> >> >> >> >>>>>>>> exception" and keep `restoreAll` abstract.
> >> >> >> >>>>>>>>
> >> >> >> >>>>>>>> 3) I think we can also return the "offset limit" in
> >> >> >> >>>>> StateRestoreContext,
> >> >> >> >>>>>>>> which is important for users to track the restoration
> >> progress
> >> >> >> >>> since
> >> >> >> >>>>>>>> otherwise they could not tell how many percent of
> >> restoration
> >> >> has
> >> >> >> >>>>>>>> completed.  I.e.:
> >> >> >> >>>>>>>>
> >> >> >> >>>>>>>> public interface BatchingStateRestoreCallback extends
> >> >> >> >>>>>>> StateRestoreCallback
> >> >> >> >>>>>>>> {
> >> >> >> >>>>>>>>
> >> >> >> >>>>>>>>   void restoreAll(Collection<KeyValue<byte[], byte []>>
> >> >> records,
> >> >> >> >>>>>>>> StateRestoreContext
> >> >> >> >>>>>>>> restoreContext);
> >> >> >> >>>>>>>>
> >> >> >> >>>>>>>>   void beginRestore(StateRestoreContext restoreContext);
> >> >> >> >>>>>>>>
> >> >> >> >>>>>>>>   void endRestore(StateRestoreContext restoreContext);
> >> >> >> >>>>>>>> }
> >> >> >> >>>>>>>>
> >> >> >> >>>>>>>> public interface StateRestoreContext {
> >> >> >> >>>>>>>>
> >> >> >> >>>>>>>>  long lastOffsetRestored();
> >> >> >> >>>>>>>>
> >> >> >> >>>>>>>>  long endOffsetToRestore();
> >> >> >> >>>>>>>>
> >> >> >> >>>>>>>>  int numberRestored();
> >> >> >> >>>>>>>> }
> >> >> >> >>>>>>>>
> >> >> >> >>>>>>>>
> >> >> >> >>>>>>>> Guozhang
> >> >> >> >>>>>>>>
> >> >> >> >>>>>>>>
> >> >> >> >>>>>>>>
> >> >> >> >>>>>>>> On Fri, Jun 2, 2017 at 9:16 AM, Bill Bejeck <
> >> >> bbejeck@gmail.com>
> >> >> >> >>>> wrote:
> >> >> >> >>>>>>>>
> >> >> >> >>>>>>>>> Guozhang, Matthias,
> >> >> >> >>>>>>>>>
> >> >> >> >>>>>>>>> Thanks for the comments.  I have updated the KIP, (JIRA
> >> title
> >> >> >> and
> >> >> >> >>>>>>>>> description as well).
> >> >> >> >>>>>>>>>
> >> >> >> >>>>>>>>> I had thought about introducing a separate interface
> >> >> altogether,
> >> >> >> >>> but
> >> >> >> >>>>>>>>> extending the current one makes more sense.
> >> >> >> >>>>>>>>>
> >> >> >> >>>>>>>>> As for intermediate callbacks based on time or number of
> >> >> >> >>> records, I
> >> >> >> >>>>>>> think
> >> >> >> >>>>>>>>> the latest update to the KIP addresses this point of
> >> querying
> >> >> >> for
> >> >> >> >>>>>>>>> intermediate results, but it would be per batch
> restored.
> >> >> >> >>>>>>>>>
> >> >> >> >>>>>>>>> Thanks,
> >> >> >> >>>>>>>>> Bill
> >> >> >> >>>>>>>>>
> >> >> >> >>>>>>>>>
> >> >> >> >>>>>>>>>
> >> >> >> >>>>>>>>>
> >> >> >> >>>>>>>>>
> >> >> >> >>>>>>>>> On Fri, Jun 2, 2017 at 8:36 AM, Jim Jagielski <
> >> >> jim@jagunet.com>
> >> >> >> >>>>> wrote:
> >> >> >> >>>>>>>>>
> >> >> >> >>>>>>>>>>
> >> >> >> >>>>>>>>>>> On Jun 2, 2017, at 12:54 AM, Matthias J. Sax <
> >> >> >> >>>>>>> matthias@confluent.io>
> >> >> >> >>>>>>>>>> wrote:
> >> >> >> >>>>>>>>>>>
> >> >> >> >>>>>>>>>>> With regard to backward compatibility, we should not
> >> change
> >> >> >> the
> >> >> >> >>>>>>>> current
> >> >> >> >>>>>>>>>>> interface, but add a new interface that extends the
> >> current
> >> >> >> >>> one.
> >> >> >> >>>>>>>>>>>
> >> >> >> >>>>>>>>>>
> >> >> >> >>>>>>>>>> ++1
> >> >> >> >>>>>>>>>>
> >> >> >> >>>>>>>>>>
> >> >> >> >>>>>>>>>
> >> >> >> >>>>>>>>
> >> >> >> >>>>>>>>
> >> >> >> >>>>>>>>
> >> >> >> >>>>>>>> --
> >> >> >> >>>>>>>> -- Guozhang
> >> >> >> >>>>>>>>
> >> >> >> >>>>>>>
> >> >> >> >>>>>
> >> >> >> >>>>>
> >> >> >> >>>>
> >> >> >> >>>
> >> >> >> >>>
> >> >> >> >>>
> >> >> >> >>> --
> >> >> >> >>> -- Guozhang
> >> >> >> >>>
> >> >> >> >>
> >> >> >> >>
> >> >> >> >
> >> >> >>
> >> >> >>
> >> >> >
> >> >>
> >> >
> >> >
> >> >
> >> > --
> >> > -- Guozhang
> >> >
> >>
> >>
> >>
> >> --
> >> -- Guozhang
> >>
> >
> >
>



-- 
-- Guozhang

Re: [DISCUSS] KIP-167: Add a restoreAll method to StateRestoreCallback

Posted by Bill Bejeck <bb...@gmail.com>.
Thinking about this some more, I have another approach.  Leave the first
parameter of as String in the StateRestoreListener interface.

But we'll provide 2 default abstract classes one implementing
StateRestoreCallback and the other implementing the
BatchingStateRestoreCallback.  Both abstract classes will also implement
the StateRestoreListener interface with no-op methods provided for the
restore progress methods.

WDYT?

On Mon, Jun 26, 2017 at 10:13 AM, Bill Bejeck <bb...@gmail.com> wrote:

> Guozhang,
>
> Thanks for the comments.
>
> I think that will work, but my concern is it might not be as clear to
> users that want to receive external notification of the restore progress
> separately (say for reporting purposes) and still send separate signals to
> the state store for resource management tasks.
>
> However I like this approach better and I have some ideas I can do in the
> implementation, so I'll update the KIP accordingly.
>
> Thanks,
> Bill
>
> On Wed, Jun 21, 2017 at 10:14 PM, Guozhang Wang <wa...@gmail.com>
> wrote:
>
>> More specifically, if we can replace the first parameter from the String
>> store name to the store instance itself, would that be sufficient to
>> cover `
>> StateRestoreNotification`?
>>
>> On Wed, Jun 21, 2017 at 7:13 PM, Guozhang Wang <wa...@gmail.com>
>> wrote:
>>
>> > Bill,
>> >
>> > I'm wondering why we need the `StateRestoreNotification` while still
>> > having `StateRestoreListener`, could the above setup achievable just
>> with
>> > `StateRestoreListener.onRestoreStart / onRestoreEnd`? I.e. it seems the
>> > later can subsume any use cases intended for the former API.
>> >
>> > Guozhang
>> >
>> > On Mon, Jun 19, 2017 at 3:23 PM, Bill Bejeck <bb...@gmail.com> wrote:
>> >
>> >> I'm going to update the KIP with new interface StateRestoreNotification
>> >> containing two methods, startRestore and endRestore.
>> >>
>> >> While naming is very similar to methods already proposed on the
>> >> StateRestoreListener, the intent of these methods is not for user
>> >> notification of restore status.  Instead these new methods are for
>> >> internal
>> >> use by the state store to perform any required setup and teardown work
>> due
>> >> to a batch restoration process.
>> >>
>> >> Here's one current use case: when using RocksDB we should optimize for
>> a
>> >> bulk load by setting Options.prepareForBulkload().
>> >>
>> >>    1. If the database has already been opened, we'll need to close it,
>> set
>> >>    the "prepareForBulkload" and re-open the database.
>> >>    2. Once the restore is completed we'll need to close and re-open the
>> >>    database with the "prepareForBulkload" option turned off.
>> >>
>> >> While we are mentioning the RocksDB use case above, the addition of
>> this
>> >> interface is not specific to any specific implementation of a
>> persistent
>> >> state store.
>> >>
>> >> Additionally, a separate interface is needed so that any user can
>> >> implement
>> >> the state restore notification feature regardless of the state restore
>> >> callback used.
>> >>
>> >> I'll also remove the "getStateRestoreListener" method and stick with
>> the
>> >> notion of a "global" restore listener for now.
>> >>
>> >> On Mon, Jun 19, 2017 at 1:05 PM, Bill Bejeck <bb...@gmail.com>
>> wrote:
>> >>
>> >> > Yes it is, more of an oversight on my part, I'll remove it from the
>> KIP.
>> >> >
>> >> >
>> >> > On Mon, Jun 19, 2017 at 12:48 PM, Matthias J. Sax <
>> >> matthias@confluent.io>
>> >> > wrote:
>> >> >
>> >> >> Hi,
>> >> >>
>> >> >> I thinks for now it's good enough to start with a single global
>> restore
>> >> >> listener. We can incrementally improve this later on if required. Of
>> >> >> course, if it's easy to do right away we can also be more fine
>> grained.
>> >> >> But for KTable, we might want to add this after getting rid of all
>> the
>> >> >> overloads we have atm.
>> >> >>
>> >> >> One question: what is the purpose of parameter "endOffset" in
>> >> >> #onRestoreEnd() -- isn't this the same value as provided in
>> >> >> #onRestoreStart() ?
>> >> >>
>> >> >>
>> >> >> -Matthias
>> >> >>
>> >> >>
>> >> >>
>> >> >> On 6/15/17 6:18 AM, Bill Bejeck wrote:
>> >> >> > Thinking about the custom StateRestoreListener approach and
>> having a
>> >> get
>> >> >> > method on the interface will really only work for custom state
>> >> stores.
>> >> >> >
>> >> >> > So we'll need to provide another way for users to set behavior
>> with
>> >> >> > provided state stores.  The only option that comes to mind now is
>> >> also
>> >> >> > adding a parameter to the StateStoreSupplier.
>> >> >> >
>> >> >> >
>> >> >> > Bill
>> >> >> >
>> >> >> >
>> >> >> > On Wed, Jun 14, 2017 at 5:39 PM, Bill Bejeck <bb...@gmail.com>
>> >> wrote:
>> >> >> >
>> >> >> >> Guozhang,
>> >> >> >>
>> >> >> >> Thanks for the comments.
>> >> >> >>
>> >> >> >> 1.  As for the granularity, I agree that having one global
>> >> >> >> StateRestoreListener could be restrictive.  But I think it's
>> >> important
>> >> >> to
>> >> >> >> have a "setStateRestoreListener" on KafkaStreams as this allows
>> >> users
>> >> >> to
>> >> >> >> define an anonymous instance that has access to local scope for
>> >> >> reporting
>> >> >> >> purposes.  This is a similar pattern we use for
>> >> >> >> KafkaStreams.setStateListener.
>> >> >> >>
>> >> >> >> As an alternative, what if we add a method to the
>> >> >> BatchingStateRestoreCallback
>> >> >> >> interface named "getStateStoreListener".   Then in an abstract
>> >> adapter
>> >> >> >> class we return null from getStateStoreListener.   But if users
>> >> want to
>> >> >> >> supply a different StateRestoreListener strategy per callback
>> they
>> >> >> would
>> >> >> >> simply override the method to return an actual instance.
>> >> >> >>
>> >> >> >> WDYT?
>> >> >> >>
>> >> >> >> 2.  I'll make the required updates to pass in the ending offset
>> at
>> >> the
>> >> >> >> start as well as the actual name of the state store.
>> >> >> >>
>> >> >> >> Bill
>> >> >> >>
>> >> >> >>
>> >> >> >> On Wed, Jun 14, 2017 at 3:53 PM, Guozhang Wang <
>> wangguoz@gmail.com>
>> >> >> wrote:
>> >> >> >>
>> >> >> >>> Thanks Bill for the updated wiki. I have a couple of more
>> comments:
>> >> >> >>>
>> >> >> >>> 1. Setting StateRestoreListener on the KafkaStreams granularity
>> may
>> >> >> not be
>> >> >> >>> sufficient, as in the listener callback we do not which store
>> it is
>> >> >> >>> restoring right now: if the topic is a changelog topic then from
>> >> the
>> >> >> >>> `TopicPartition` we may be able to infer the state store name,
>> but
>> >> if
>> >> >> the
>> >> >> >>> topic is the source topic read as a KTable then we may not know
>> >> which
>> >> >> >>> store
>> >> >> >>> it is restoring right now; plus forcing users to infer the state
>> >> store
>> >> >> >>> name
>> >> >> >>> from the topic partition name would not be intuitive as well.
>> Plus
>> >> for
>> >> >> >>> different stores the listener may be implemented differently,
>> and
>> >> >> setting
>> >> >> >>> a
>> >> >> >>> global listener would force users to branch on the
>> topic-partition
>> >> >> names,
>> >> >> >>> similarly to what we did in the global timestamp extractor. On
>> the
>> >> >> other
>> >> >> >>> hand, I also agree that setting the listener on the per-store
>> >> >> granularity
>> >> >> >>> may be a bit cumbersome since if users want to override it on a
>> >> >> specific
>> >> >> >>> store it needs to expose some APIs maybe at StateStoreSupplier.
>> So
>> >> >> would
>> >> >> >>> love to hear other people's opinions.
>> >> >> >>>
>> >> >> >>> If we think that different implemented restoring callback may be
>> >> less
>> >> >> >>> common, then I'd suggest at least replace the `TopicPartition`
>> >> >> parameter
>> >> >> >>> with the `String` store name and the `TaskId`?
>> >> >> >>>
>> >> >> >>> 2. I think we can pass in the `long endOffset` in the
>> >> `onRestoreStart`
>> >> >> >>> function as well, as we will have read the endOffset already by
>> >> then;
>> >> >> >>> otherwise users can still not be able to track the restoration
>> >> >> progress
>> >> >> >>> (e.g. how much percentage I have been restoring so far, to
>> estimate
>> >> >> on how
>> >> >> >>> long I still need to wait).
>> >> >> >>>
>> >> >> >>>
>> >> >> >>> Guozhang
>> >> >> >>>
>> >> >> >>>
>> >> >> >>>
>> >> >> >>> On Wed, Jun 14, 2017 at 12:25 PM, Bill Bejeck <
>> bbejeck@gmail.com>
>> >> >> wrote:
>> >> >> >>>
>> >> >> >>>> Eno,
>> >> >> >>>>
>> >> >> >>>> Thanks for the comments.
>> >> >> >>>>
>> >> >> >>>> 1. As for having both restore and restoreAll, I kept the
>> restore
>> >> >> method
>> >> >> >>> for
>> >> >> >>>> backward compatibility as that is what is used by current
>> >> >> implementing
>> >> >> >>>> classes. However as I think about it makes things cleaner to
>> have
>> >> a
>> >> >> >>> single
>> >> >> >>>> restore method taking a collection. I'll wait for others to
>> weigh
>> >> in,
>> >> >> >>> but
>> >> >> >>>> I'm leaning towards having a single restore method.
>> >> >> >>>>
>> >> >> >>>> 2. The "onBatchRestored" method is for keeping track of the
>> >> restore
>> >> >> >>> process
>> >> >> >>>> as we load records from each poll request.
>> >> >> >>>>
>> >> >> >>>>    For example if the change log contained 5000 records and
>> >> >> >>>> MAX_POLL_RECORDS is set to 1000, the "onBatchRestored" method
>> >> would
>> >> >> get
>> >> >> >>>> called 5 times each time with the ending offset of the last
>> >> record in
>> >> >> >>> the
>> >> >> >>>> batch and the count    of the batch.   I'll update the KIP to
>> add
>> >> >> >>> comments
>> >> >> >>>> above the interface methods.
>> >> >> >>>>
>> >> >> >>>>
>> >> >> >>>> Thanks,
>> >> >> >>>> Bill
>> >> >> >>>>
>> >> >> >>>>
>> >> >> >>>> On Wed, Jun 14, 2017 at 11:49 AM, Eno Thereska <
>> >> >> eno.thereska@gmail.com>
>> >> >> >>>> wrote:
>> >> >> >>>>
>> >> >> >>>>> Thanks Bill,
>> >> >> >>>>>
>> >> >> >>>>> A couple of questions:
>> >> >> >>>>>
>> >> >> >>>>>
>> >> >> >>>> 1. why do we need both restore and restoreAll, why can't we
>> just
>> >> have
>> >> >> >>> one,
>> >> >> >>>>> that takes a collection (i.e., restore all)? Are there cases
>> when
>> >> >> >>> people
>> >> >> >>>>> want to restore one at a time? In that case, they could still
>> use
>> >> >> >>>>> restoreAll with just 1 record in the collection right?
>> >> >> >>>>>
>> >> >> >>>>> 2. I don't quite get "onBatchRestored". Could you put a small
>> >> >> comment
>> >> >> >>> on
>> >> >> >>>>> top of all three methods. An example might help here.
>> >> >> >>>>>
>> >> >> >>>>> Thanks
>> >> >> >>>>> Eno
>> >> >> >>>>>
>> >> >> >>>>>
>> >> >> >>>>>> On 8 Jun 2017, at 18:05, Bill Bejeck <bb...@gmail.com>
>> wrote:
>> >> >> >>>>>>
>> >> >> >>>>>> Guozhang, Damian thanks for the comments.
>> >> >> >>>>>>
>> >> >> >>>>>> Giving developers the ability to hook into StateStore
>> recovery
>> >> >> >>> phases
>> >> >> >>>> was
>> >> >> >>>>>> part of my original intent. However the state the KIP is in
>> now
>> >> >> >>> won't
>> >> >> >>>>>> provide this functionality.
>> >> >> >>>>>>
>> >> >> >>>>>> As a result I'll be doing a significant revision of KIP-167.
>> >> I'll
>> >> >> >>> be
>> >> >> >>>>> sure
>> >> >> >>>>>> to incorporate all your comments in the new revision.
>> >> >> >>>>>>
>> >> >> >>>>>> Thanks,
>> >> >> >>>>>> Bill
>> >> >> >>>>>>
>> >> >> >>>>>> On Wed, Jun 7, 2017 at 5:24 AM, Damian Guy <
>> >> damian.guy@gmail.com>
>> >> >> >>>> wrote:
>> >> >> >>>>>>
>> >> >> >>>>>>> I'm largely in agreement with what Guozhang has suggested,
>> >> i.e.,
>> >> >> >>>>>>> StateRestoreContext shouldn't have any setters on it and
>> also
>> >> need
>> >> >> >>> to
>> >> >> >>>>> have
>> >> >> >>>>>>> the end offset available such that people can use it derive
>> >> >> >>> progress.
>> >> >> >>>>>>> Slightly different, maybe the StateRestoreContext interface
>> >> could
>> >> >> >>> be:
>> >> >> >>>>>>>
>> >> >> >>>>>>> long beginOffset()
>> >> >> >>>>>>> long endOffset()
>> >> >> >>>>>>> long currentOffset()
>> >> >> >>>>>>>
>> >> >> >>>>>>> One further thing, this currently doesn't provide developers
>> >> the
>> >> >> >>>>> ability to
>> >> >> >>>>>>> hook into this information if they are using the built-in
>> >> >> >>> StateStores.
>> >> >> >>>>> Is
>> >> >> >>>>>>> this something we should be considering?
>> >> >> >>>>>>>
>> >> >> >>>>>>>
>> >> >> >>>>>>> On Tue, 6 Jun 2017 at 23:32 Guozhang Wang <
>> wangguoz@gmail.com>
>> >> >> >>> wrote:
>> >> >> >>>>>>>
>> >> >> >>>>>>>> Thanks for the updated KIP Bill, I have a couple of
>> comments:
>> >> >> >>>>>>>>
>> >> >> >>>>>>>> 1) I'm assuming beginRestore / endRestore is called only
>> once
>> >> per
>> >> >> >>>> store
>> >> >> >>>>>>>> throughout the whole restoration process, and restoreAll is
>> >> >> called
>> >> >> >>>> per
>> >> >> >>>>>>>> batch. In that case I feel we can set the
>> StateRestoreContext
>> >> as
>> >> >> a
>> >> >> >>>>> second
>> >> >> >>>>>>>> parameter in restoreAll and in endRestore as well, and let
>> the
>> >> >> >>>> library
>> >> >> >>>>> to
>> >> >> >>>>>>>> set the corresponding values instead and only let users to
>> >> read
>> >> >> >>>> (since
>> >> >> >>>>>>> the
>> >> >> >>>>>>>> collection of key-value pairs do not contain offset
>> >> information
>> >> >> >>>> anyways
>> >> >> >>>>>>>> users cannot really set the offset). The
>> "lastOffsetRestored"
>> >> >> >>> would
>> >> >> >>>> be
>> >> >> >>>>>>> the
>> >> >> >>>>>>>> starting offset when called on `beginRestore`.
>> >> >> >>>>>>>>
>> >> >> >>>>>>>> 2) Users who wants to implement their own batch restoration
>> >> >> >>> callbacks
>> >> >> >>>>>>> would
>> >> >> >>>>>>>> now need to implement both `restore` and `restoreAll` while
>> >> they
>> >> >> >>>> either
>> >> >> >>>>>>> let
>> >> >> >>>>>>>> `restoreAll` to call `restore` or implement the logic in
>> >> >> >>> `restoreAll`
>> >> >> >>>>>>> only
>> >> >> >>>>>>>> and never call `restore`. Maybe we can provide two abstract
>> >> impl
>> >> >> >>> of
>> >> >> >>>>>>>> BatchingStateRestoreCallbacks which does beginRestore /
>> >> >> >>> endRestore as
>> >> >> >>>>>>>> no-ops, with one callback implementing `restoreAll` to call
>> >> >> >>> abstract
>> >> >> >>>>>>>> `restore` while the other implement `restore` to throw "not
>> >> >> >>> supported
>> >> >> >>>>>>>> exception" and keep `restoreAll` abstract.
>> >> >> >>>>>>>>
>> >> >> >>>>>>>> 3) I think we can also return the "offset limit" in
>> >> >> >>>>> StateRestoreContext,
>> >> >> >>>>>>>> which is important for users to track the restoration
>> progress
>> >> >> >>> since
>> >> >> >>>>>>>> otherwise they could not tell how many percent of
>> restoration
>> >> has
>> >> >> >>>>>>>> completed.  I.e.:
>> >> >> >>>>>>>>
>> >> >> >>>>>>>> public interface BatchingStateRestoreCallback extends
>> >> >> >>>>>>> StateRestoreCallback
>> >> >> >>>>>>>> {
>> >> >> >>>>>>>>
>> >> >> >>>>>>>>   void restoreAll(Collection<KeyValue<byte[], byte []>>
>> >> records,
>> >> >> >>>>>>>> StateRestoreContext
>> >> >> >>>>>>>> restoreContext);
>> >> >> >>>>>>>>
>> >> >> >>>>>>>>   void beginRestore(StateRestoreContext restoreContext);
>> >> >> >>>>>>>>
>> >> >> >>>>>>>>   void endRestore(StateRestoreContext restoreContext);
>> >> >> >>>>>>>> }
>> >> >> >>>>>>>>
>> >> >> >>>>>>>> public interface StateRestoreContext {
>> >> >> >>>>>>>>
>> >> >> >>>>>>>>  long lastOffsetRestored();
>> >> >> >>>>>>>>
>> >> >> >>>>>>>>  long endOffsetToRestore();
>> >> >> >>>>>>>>
>> >> >> >>>>>>>>  int numberRestored();
>> >> >> >>>>>>>> }
>> >> >> >>>>>>>>
>> >> >> >>>>>>>>
>> >> >> >>>>>>>> Guozhang
>> >> >> >>>>>>>>
>> >> >> >>>>>>>>
>> >> >> >>>>>>>>
>> >> >> >>>>>>>> On Fri, Jun 2, 2017 at 9:16 AM, Bill Bejeck <
>> >> bbejeck@gmail.com>
>> >> >> >>>> wrote:
>> >> >> >>>>>>>>
>> >> >> >>>>>>>>> Guozhang, Matthias,
>> >> >> >>>>>>>>>
>> >> >> >>>>>>>>> Thanks for the comments.  I have updated the KIP, (JIRA
>> title
>> >> >> and
>> >> >> >>>>>>>>> description as well).
>> >> >> >>>>>>>>>
>> >> >> >>>>>>>>> I had thought about introducing a separate interface
>> >> altogether,
>> >> >> >>> but
>> >> >> >>>>>>>>> extending the current one makes more sense.
>> >> >> >>>>>>>>>
>> >> >> >>>>>>>>> As for intermediate callbacks based on time or number of
>> >> >> >>> records, I
>> >> >> >>>>>>> think
>> >> >> >>>>>>>>> the latest update to the KIP addresses this point of
>> querying
>> >> >> for
>> >> >> >>>>>>>>> intermediate results, but it would be per batch restored.
>> >> >> >>>>>>>>>
>> >> >> >>>>>>>>> Thanks,
>> >> >> >>>>>>>>> Bill
>> >> >> >>>>>>>>>
>> >> >> >>>>>>>>>
>> >> >> >>>>>>>>>
>> >> >> >>>>>>>>>
>> >> >> >>>>>>>>>
>> >> >> >>>>>>>>> On Fri, Jun 2, 2017 at 8:36 AM, Jim Jagielski <
>> >> jim@jagunet.com>
>> >> >> >>>>> wrote:
>> >> >> >>>>>>>>>
>> >> >> >>>>>>>>>>
>> >> >> >>>>>>>>>>> On Jun 2, 2017, at 12:54 AM, Matthias J. Sax <
>> >> >> >>>>>>> matthias@confluent.io>
>> >> >> >>>>>>>>>> wrote:
>> >> >> >>>>>>>>>>>
>> >> >> >>>>>>>>>>> With regard to backward compatibility, we should not
>> change
>> >> >> the
>> >> >> >>>>>>>> current
>> >> >> >>>>>>>>>>> interface, but add a new interface that extends the
>> current
>> >> >> >>> one.
>> >> >> >>>>>>>>>>>
>> >> >> >>>>>>>>>>
>> >> >> >>>>>>>>>> ++1
>> >> >> >>>>>>>>>>
>> >> >> >>>>>>>>>>
>> >> >> >>>>>>>>>
>> >> >> >>>>>>>>
>> >> >> >>>>>>>>
>> >> >> >>>>>>>>
>> >> >> >>>>>>>> --
>> >> >> >>>>>>>> -- Guozhang
>> >> >> >>>>>>>>
>> >> >> >>>>>>>
>> >> >> >>>>>
>> >> >> >>>>>
>> >> >> >>>>
>> >> >> >>>
>> >> >> >>>
>> >> >> >>>
>> >> >> >>> --
>> >> >> >>> -- Guozhang
>> >> >> >>>
>> >> >> >>
>> >> >> >>
>> >> >> >
>> >> >>
>> >> >>
>> >> >
>> >>
>> >
>> >
>> >
>> > --
>> > -- Guozhang
>> >
>>
>>
>>
>> --
>> -- Guozhang
>>
>
>

Re: [DISCUSS] KIP-167: Add a restoreAll method to StateRestoreCallback

Posted by Bill Bejeck <bb...@gmail.com>.
Guozhang,

Thanks for the comments.

I think that will work, but my concern is it might not be as clear to users
that want to receive external notification of the restore progress
separately (say for reporting purposes) and still send separate signals to
the state store for resource management tasks.

However I like this approach better and I have some ideas I can do in the
implementation, so I'll update the KIP accordingly.

Thanks,
Bill

On Wed, Jun 21, 2017 at 10:14 PM, Guozhang Wang <wa...@gmail.com> wrote:

> More specifically, if we can replace the first parameter from the String
> store name to the store instance itself, would that be sufficient to cover
> `
> StateRestoreNotification`?
>
> On Wed, Jun 21, 2017 at 7:13 PM, Guozhang Wang <wa...@gmail.com> wrote:
>
> > Bill,
> >
> > I'm wondering why we need the `StateRestoreNotification` while still
> > having `StateRestoreListener`, could the above setup achievable just with
> > `StateRestoreListener.onRestoreStart / onRestoreEnd`? I.e. it seems the
> > later can subsume any use cases intended for the former API.
> >
> > Guozhang
> >
> > On Mon, Jun 19, 2017 at 3:23 PM, Bill Bejeck <bb...@gmail.com> wrote:
> >
> >> I'm going to update the KIP with new interface StateRestoreNotification
> >> containing two methods, startRestore and endRestore.
> >>
> >> While naming is very similar to methods already proposed on the
> >> StateRestoreListener, the intent of these methods is not for user
> >> notification of restore status.  Instead these new methods are for
> >> internal
> >> use by the state store to perform any required setup and teardown work
> due
> >> to a batch restoration process.
> >>
> >> Here's one current use case: when using RocksDB we should optimize for a
> >> bulk load by setting Options.prepareForBulkload().
> >>
> >>    1. If the database has already been opened, we'll need to close it,
> set
> >>    the "prepareForBulkload" and re-open the database.
> >>    2. Once the restore is completed we'll need to close and re-open the
> >>    database with the "prepareForBulkload" option turned off.
> >>
> >> While we are mentioning the RocksDB use case above, the addition of this
> >> interface is not specific to any specific implementation of a persistent
> >> state store.
> >>
> >> Additionally, a separate interface is needed so that any user can
> >> implement
> >> the state restore notification feature regardless of the state restore
> >> callback used.
> >>
> >> I'll also remove the "getStateRestoreListener" method and stick with the
> >> notion of a "global" restore listener for now.
> >>
> >> On Mon, Jun 19, 2017 at 1:05 PM, Bill Bejeck <bb...@gmail.com> wrote:
> >>
> >> > Yes it is, more of an oversight on my part, I'll remove it from the
> KIP.
> >> >
> >> >
> >> > On Mon, Jun 19, 2017 at 12:48 PM, Matthias J. Sax <
> >> matthias@confluent.io>
> >> > wrote:
> >> >
> >> >> Hi,
> >> >>
> >> >> I thinks for now it's good enough to start with a single global
> restore
> >> >> listener. We can incrementally improve this later on if required. Of
> >> >> course, if it's easy to do right away we can also be more fine
> grained.
> >> >> But for KTable, we might want to add this after getting rid of all
> the
> >> >> overloads we have atm.
> >> >>
> >> >> One question: what is the purpose of parameter "endOffset" in
> >> >> #onRestoreEnd() -- isn't this the same value as provided in
> >> >> #onRestoreStart() ?
> >> >>
> >> >>
> >> >> -Matthias
> >> >>
> >> >>
> >> >>
> >> >> On 6/15/17 6:18 AM, Bill Bejeck wrote:
> >> >> > Thinking about the custom StateRestoreListener approach and having
> a
> >> get
> >> >> > method on the interface will really only work for custom state
> >> stores.
> >> >> >
> >> >> > So we'll need to provide another way for users to set behavior with
> >> >> > provided state stores.  The only option that comes to mind now is
> >> also
> >> >> > adding a parameter to the StateStoreSupplier.
> >> >> >
> >> >> >
> >> >> > Bill
> >> >> >
> >> >> >
> >> >> > On Wed, Jun 14, 2017 at 5:39 PM, Bill Bejeck <bb...@gmail.com>
> >> wrote:
> >> >> >
> >> >> >> Guozhang,
> >> >> >>
> >> >> >> Thanks for the comments.
> >> >> >>
> >> >> >> 1.  As for the granularity, I agree that having one global
> >> >> >> StateRestoreListener could be restrictive.  But I think it's
> >> important
> >> >> to
> >> >> >> have a "setStateRestoreListener" on KafkaStreams as this allows
> >> users
> >> >> to
> >> >> >> define an anonymous instance that has access to local scope for
> >> >> reporting
> >> >> >> purposes.  This is a similar pattern we use for
> >> >> >> KafkaStreams.setStateListener.
> >> >> >>
> >> >> >> As an alternative, what if we add a method to the
> >> >> BatchingStateRestoreCallback
> >> >> >> interface named "getStateStoreListener".   Then in an abstract
> >> adapter
> >> >> >> class we return null from getStateStoreListener.   But if users
> >> want to
> >> >> >> supply a different StateRestoreListener strategy per callback they
> >> >> would
> >> >> >> simply override the method to return an actual instance.
> >> >> >>
> >> >> >> WDYT?
> >> >> >>
> >> >> >> 2.  I'll make the required updates to pass in the ending offset at
> >> the
> >> >> >> start as well as the actual name of the state store.
> >> >> >>
> >> >> >> Bill
> >> >> >>
> >> >> >>
> >> >> >> On Wed, Jun 14, 2017 at 3:53 PM, Guozhang Wang <
> wangguoz@gmail.com>
> >> >> wrote:
> >> >> >>
> >> >> >>> Thanks Bill for the updated wiki. I have a couple of more
> comments:
> >> >> >>>
> >> >> >>> 1. Setting StateRestoreListener on the KafkaStreams granularity
> may
> >> >> not be
> >> >> >>> sufficient, as in the listener callback we do not which store it
> is
> >> >> >>> restoring right now: if the topic is a changelog topic then from
> >> the
> >> >> >>> `TopicPartition` we may be able to infer the state store name,
> but
> >> if
> >> >> the
> >> >> >>> topic is the source topic read as a KTable then we may not know
> >> which
> >> >> >>> store
> >> >> >>> it is restoring right now; plus forcing users to infer the state
> >> store
> >> >> >>> name
> >> >> >>> from the topic partition name would not be intuitive as well.
> Plus
> >> for
> >> >> >>> different stores the listener may be implemented differently, and
> >> >> setting
> >> >> >>> a
> >> >> >>> global listener would force users to branch on the
> topic-partition
> >> >> names,
> >> >> >>> similarly to what we did in the global timestamp extractor. On
> the
> >> >> other
> >> >> >>> hand, I also agree that setting the listener on the per-store
> >> >> granularity
> >> >> >>> may be a bit cumbersome since if users want to override it on a
> >> >> specific
> >> >> >>> store it needs to expose some APIs maybe at StateStoreSupplier.
> So
> >> >> would
> >> >> >>> love to hear other people's opinions.
> >> >> >>>
> >> >> >>> If we think that different implemented restoring callback may be
> >> less
> >> >> >>> common, then I'd suggest at least replace the `TopicPartition`
> >> >> parameter
> >> >> >>> with the `String` store name and the `TaskId`?
> >> >> >>>
> >> >> >>> 2. I think we can pass in the `long endOffset` in the
> >> `onRestoreStart`
> >> >> >>> function as well, as we will have read the endOffset already by
> >> then;
> >> >> >>> otherwise users can still not be able to track the restoration
> >> >> progress
> >> >> >>> (e.g. how much percentage I have been restoring so far, to
> estimate
> >> >> on how
> >> >> >>> long I still need to wait).
> >> >> >>>
> >> >> >>>
> >> >> >>> Guozhang
> >> >> >>>
> >> >> >>>
> >> >> >>>
> >> >> >>> On Wed, Jun 14, 2017 at 12:25 PM, Bill Bejeck <bbejeck@gmail.com
> >
> >> >> wrote:
> >> >> >>>
> >> >> >>>> Eno,
> >> >> >>>>
> >> >> >>>> Thanks for the comments.
> >> >> >>>>
> >> >> >>>> 1. As for having both restore and restoreAll, I kept the restore
> >> >> method
> >> >> >>> for
> >> >> >>>> backward compatibility as that is what is used by current
> >> >> implementing
> >> >> >>>> classes. However as I think about it makes things cleaner to
> have
> >> a
> >> >> >>> single
> >> >> >>>> restore method taking a collection. I'll wait for others to
> weigh
> >> in,
> >> >> >>> but
> >> >> >>>> I'm leaning towards having a single restore method.
> >> >> >>>>
> >> >> >>>> 2. The "onBatchRestored" method is for keeping track of the
> >> restore
> >> >> >>> process
> >> >> >>>> as we load records from each poll request.
> >> >> >>>>
> >> >> >>>>    For example if the change log contained 5000 records and
> >> >> >>>> MAX_POLL_RECORDS is set to 1000, the "onBatchRestored" method
> >> would
> >> >> get
> >> >> >>>> called 5 times each time with the ending offset of the last
> >> record in
> >> >> >>> the
> >> >> >>>> batch and the count    of the batch.   I'll update the KIP to
> add
> >> >> >>> comments
> >> >> >>>> above the interface methods.
> >> >> >>>>
> >> >> >>>>
> >> >> >>>> Thanks,
> >> >> >>>> Bill
> >> >> >>>>
> >> >> >>>>
> >> >> >>>> On Wed, Jun 14, 2017 at 11:49 AM, Eno Thereska <
> >> >> eno.thereska@gmail.com>
> >> >> >>>> wrote:
> >> >> >>>>
> >> >> >>>>> Thanks Bill,
> >> >> >>>>>
> >> >> >>>>> A couple of questions:
> >> >> >>>>>
> >> >> >>>>>
> >> >> >>>> 1. why do we need both restore and restoreAll, why can't we just
> >> have
> >> >> >>> one,
> >> >> >>>>> that takes a collection (i.e., restore all)? Are there cases
> when
> >> >> >>> people
> >> >> >>>>> want to restore one at a time? In that case, they could still
> use
> >> >> >>>>> restoreAll with just 1 record in the collection right?
> >> >> >>>>>
> >> >> >>>>> 2. I don't quite get "onBatchRestored". Could you put a small
> >> >> comment
> >> >> >>> on
> >> >> >>>>> top of all three methods. An example might help here.
> >> >> >>>>>
> >> >> >>>>> Thanks
> >> >> >>>>> Eno
> >> >> >>>>>
> >> >> >>>>>
> >> >> >>>>>> On 8 Jun 2017, at 18:05, Bill Bejeck <bb...@gmail.com>
> wrote:
> >> >> >>>>>>
> >> >> >>>>>> Guozhang, Damian thanks for the comments.
> >> >> >>>>>>
> >> >> >>>>>> Giving developers the ability to hook into StateStore recovery
> >> >> >>> phases
> >> >> >>>> was
> >> >> >>>>>> part of my original intent. However the state the KIP is in
> now
> >> >> >>> won't
> >> >> >>>>>> provide this functionality.
> >> >> >>>>>>
> >> >> >>>>>> As a result I'll be doing a significant revision of KIP-167.
> >> I'll
> >> >> >>> be
> >> >> >>>>> sure
> >> >> >>>>>> to incorporate all your comments in the new revision.
> >> >> >>>>>>
> >> >> >>>>>> Thanks,
> >> >> >>>>>> Bill
> >> >> >>>>>>
> >> >> >>>>>> On Wed, Jun 7, 2017 at 5:24 AM, Damian Guy <
> >> damian.guy@gmail.com>
> >> >> >>>> wrote:
> >> >> >>>>>>
> >> >> >>>>>>> I'm largely in agreement with what Guozhang has suggested,
> >> i.e.,
> >> >> >>>>>>> StateRestoreContext shouldn't have any setters on it and also
> >> need
> >> >> >>> to
> >> >> >>>>> have
> >> >> >>>>>>> the end offset available such that people can use it derive
> >> >> >>> progress.
> >> >> >>>>>>> Slightly different, maybe the StateRestoreContext interface
> >> could
> >> >> >>> be:
> >> >> >>>>>>>
> >> >> >>>>>>> long beginOffset()
> >> >> >>>>>>> long endOffset()
> >> >> >>>>>>> long currentOffset()
> >> >> >>>>>>>
> >> >> >>>>>>> One further thing, this currently doesn't provide developers
> >> the
> >> >> >>>>> ability to
> >> >> >>>>>>> hook into this information if they are using the built-in
> >> >> >>> StateStores.
> >> >> >>>>> Is
> >> >> >>>>>>> this something we should be considering?
> >> >> >>>>>>>
> >> >> >>>>>>>
> >> >> >>>>>>> On Tue, 6 Jun 2017 at 23:32 Guozhang Wang <
> wangguoz@gmail.com>
> >> >> >>> wrote:
> >> >> >>>>>>>
> >> >> >>>>>>>> Thanks for the updated KIP Bill, I have a couple of
> comments:
> >> >> >>>>>>>>
> >> >> >>>>>>>> 1) I'm assuming beginRestore / endRestore is called only
> once
> >> per
> >> >> >>>> store
> >> >> >>>>>>>> throughout the whole restoration process, and restoreAll is
> >> >> called
> >> >> >>>> per
> >> >> >>>>>>>> batch. In that case I feel we can set the
> StateRestoreContext
> >> as
> >> >> a
> >> >> >>>>> second
> >> >> >>>>>>>> parameter in restoreAll and in endRestore as well, and let
> the
> >> >> >>>> library
> >> >> >>>>> to
> >> >> >>>>>>>> set the corresponding values instead and only let users to
> >> read
> >> >> >>>> (since
> >> >> >>>>>>> the
> >> >> >>>>>>>> collection of key-value pairs do not contain offset
> >> information
> >> >> >>>> anyways
> >> >> >>>>>>>> users cannot really set the offset). The
> "lastOffsetRestored"
> >> >> >>> would
> >> >> >>>> be
> >> >> >>>>>>> the
> >> >> >>>>>>>> starting offset when called on `beginRestore`.
> >> >> >>>>>>>>
> >> >> >>>>>>>> 2) Users who wants to implement their own batch restoration
> >> >> >>> callbacks
> >> >> >>>>>>> would
> >> >> >>>>>>>> now need to implement both `restore` and `restoreAll` while
> >> they
> >> >> >>>> either
> >> >> >>>>>>> let
> >> >> >>>>>>>> `restoreAll` to call `restore` or implement the logic in
> >> >> >>> `restoreAll`
> >> >> >>>>>>> only
> >> >> >>>>>>>> and never call `restore`. Maybe we can provide two abstract
> >> impl
> >> >> >>> of
> >> >> >>>>>>>> BatchingStateRestoreCallbacks which does beginRestore /
> >> >> >>> endRestore as
> >> >> >>>>>>>> no-ops, with one callback implementing `restoreAll` to call
> >> >> >>> abstract
> >> >> >>>>>>>> `restore` while the other implement `restore` to throw "not
> >> >> >>> supported
> >> >> >>>>>>>> exception" and keep `restoreAll` abstract.
> >> >> >>>>>>>>
> >> >> >>>>>>>> 3) I think we can also return the "offset limit" in
> >> >> >>>>> StateRestoreContext,
> >> >> >>>>>>>> which is important for users to track the restoration
> progress
> >> >> >>> since
> >> >> >>>>>>>> otherwise they could not tell how many percent of
> restoration
> >> has
> >> >> >>>>>>>> completed.  I.e.:
> >> >> >>>>>>>>
> >> >> >>>>>>>> public interface BatchingStateRestoreCallback extends
> >> >> >>>>>>> StateRestoreCallback
> >> >> >>>>>>>> {
> >> >> >>>>>>>>
> >> >> >>>>>>>>   void restoreAll(Collection<KeyValue<byte[], byte []>>
> >> records,
> >> >> >>>>>>>> StateRestoreContext
> >> >> >>>>>>>> restoreContext);
> >> >> >>>>>>>>
> >> >> >>>>>>>>   void beginRestore(StateRestoreContext restoreContext);
> >> >> >>>>>>>>
> >> >> >>>>>>>>   void endRestore(StateRestoreContext restoreContext);
> >> >> >>>>>>>> }
> >> >> >>>>>>>>
> >> >> >>>>>>>> public interface StateRestoreContext {
> >> >> >>>>>>>>
> >> >> >>>>>>>>  long lastOffsetRestored();
> >> >> >>>>>>>>
> >> >> >>>>>>>>  long endOffsetToRestore();
> >> >> >>>>>>>>
> >> >> >>>>>>>>  int numberRestored();
> >> >> >>>>>>>> }
> >> >> >>>>>>>>
> >> >> >>>>>>>>
> >> >> >>>>>>>> Guozhang
> >> >> >>>>>>>>
> >> >> >>>>>>>>
> >> >> >>>>>>>>
> >> >> >>>>>>>> On Fri, Jun 2, 2017 at 9:16 AM, Bill Bejeck <
> >> bbejeck@gmail.com>
> >> >> >>>> wrote:
> >> >> >>>>>>>>
> >> >> >>>>>>>>> Guozhang, Matthias,
> >> >> >>>>>>>>>
> >> >> >>>>>>>>> Thanks for the comments.  I have updated the KIP, (JIRA
> title
> >> >> and
> >> >> >>>>>>>>> description as well).
> >> >> >>>>>>>>>
> >> >> >>>>>>>>> I had thought about introducing a separate interface
> >> altogether,
> >> >> >>> but
> >> >> >>>>>>>>> extending the current one makes more sense.
> >> >> >>>>>>>>>
> >> >> >>>>>>>>> As for intermediate callbacks based on time or number of
> >> >> >>> records, I
> >> >> >>>>>>> think
> >> >> >>>>>>>>> the latest update to the KIP addresses this point of
> querying
> >> >> for
> >> >> >>>>>>>>> intermediate results, but it would be per batch restored.
> >> >> >>>>>>>>>
> >> >> >>>>>>>>> Thanks,
> >> >> >>>>>>>>> Bill
> >> >> >>>>>>>>>
> >> >> >>>>>>>>>
> >> >> >>>>>>>>>
> >> >> >>>>>>>>>
> >> >> >>>>>>>>>
> >> >> >>>>>>>>> On Fri, Jun 2, 2017 at 8:36 AM, Jim Jagielski <
> >> jim@jagunet.com>
> >> >> >>>>> wrote:
> >> >> >>>>>>>>>
> >> >> >>>>>>>>>>
> >> >> >>>>>>>>>>> On Jun 2, 2017, at 12:54 AM, Matthias J. Sax <
> >> >> >>>>>>> matthias@confluent.io>
> >> >> >>>>>>>>>> wrote:
> >> >> >>>>>>>>>>>
> >> >> >>>>>>>>>>> With regard to backward compatibility, we should not
> change
> >> >> the
> >> >> >>>>>>>> current
> >> >> >>>>>>>>>>> interface, but add a new interface that extends the
> current
> >> >> >>> one.
> >> >> >>>>>>>>>>>
> >> >> >>>>>>>>>>
> >> >> >>>>>>>>>> ++1
> >> >> >>>>>>>>>>
> >> >> >>>>>>>>>>
> >> >> >>>>>>>>>
> >> >> >>>>>>>>
> >> >> >>>>>>>>
> >> >> >>>>>>>>
> >> >> >>>>>>>> --
> >> >> >>>>>>>> -- Guozhang
> >> >> >>>>>>>>
> >> >> >>>>>>>
> >> >> >>>>>
> >> >> >>>>>
> >> >> >>>>
> >> >> >>>
> >> >> >>>
> >> >> >>>
> >> >> >>> --
> >> >> >>> -- Guozhang
> >> >> >>>
> >> >> >>
> >> >> >>
> >> >> >
> >> >>
> >> >>
> >> >
> >>
> >
> >
> >
> > --
> > -- Guozhang
> >
>
>
>
> --
> -- Guozhang
>

Re: [DISCUSS] KIP-167: Add a restoreAll method to StateRestoreCallback

Posted by Guozhang Wang <wa...@gmail.com>.
More specifically, if we can replace the first parameter from the String
store name to the store instance itself, would that be sufficient to cover `
StateRestoreNotification`?

On Wed, Jun 21, 2017 at 7:13 PM, Guozhang Wang <wa...@gmail.com> wrote:

> Bill,
>
> I'm wondering why we need the `StateRestoreNotification` while still
> having `StateRestoreListener`, could the above setup achievable just with
> `StateRestoreListener.onRestoreStart / onRestoreEnd`? I.e. it seems the
> later can subsume any use cases intended for the former API.
>
> Guozhang
>
> On Mon, Jun 19, 2017 at 3:23 PM, Bill Bejeck <bb...@gmail.com> wrote:
>
>> I'm going to update the KIP with new interface StateRestoreNotification
>> containing two methods, startRestore and endRestore.
>>
>> While naming is very similar to methods already proposed on the
>> StateRestoreListener, the intent of these methods is not for user
>> notification of restore status.  Instead these new methods are for
>> internal
>> use by the state store to perform any required setup and teardown work due
>> to a batch restoration process.
>>
>> Here's one current use case: when using RocksDB we should optimize for a
>> bulk load by setting Options.prepareForBulkload().
>>
>>    1. If the database has already been opened, we'll need to close it, set
>>    the "prepareForBulkload" and re-open the database.
>>    2. Once the restore is completed we'll need to close and re-open the
>>    database with the "prepareForBulkload" option turned off.
>>
>> While we are mentioning the RocksDB use case above, the addition of this
>> interface is not specific to any specific implementation of a persistent
>> state store.
>>
>> Additionally, a separate interface is needed so that any user can
>> implement
>> the state restore notification feature regardless of the state restore
>> callback used.
>>
>> I'll also remove the "getStateRestoreListener" method and stick with the
>> notion of a "global" restore listener for now.
>>
>> On Mon, Jun 19, 2017 at 1:05 PM, Bill Bejeck <bb...@gmail.com> wrote:
>>
>> > Yes it is, more of an oversight on my part, I'll remove it from the KIP.
>> >
>> >
>> > On Mon, Jun 19, 2017 at 12:48 PM, Matthias J. Sax <
>> matthias@confluent.io>
>> > wrote:
>> >
>> >> Hi,
>> >>
>> >> I thinks for now it's good enough to start with a single global restore
>> >> listener. We can incrementally improve this later on if required. Of
>> >> course, if it's easy to do right away we can also be more fine grained.
>> >> But for KTable, we might want to add this after getting rid of all the
>> >> overloads we have atm.
>> >>
>> >> One question: what is the purpose of parameter "endOffset" in
>> >> #onRestoreEnd() -- isn't this the same value as provided in
>> >> #onRestoreStart() ?
>> >>
>> >>
>> >> -Matthias
>> >>
>> >>
>> >>
>> >> On 6/15/17 6:18 AM, Bill Bejeck wrote:
>> >> > Thinking about the custom StateRestoreListener approach and having a
>> get
>> >> > method on the interface will really only work for custom state
>> stores.
>> >> >
>> >> > So we'll need to provide another way for users to set behavior with
>> >> > provided state stores.  The only option that comes to mind now is
>> also
>> >> > adding a parameter to the StateStoreSupplier.
>> >> >
>> >> >
>> >> > Bill
>> >> >
>> >> >
>> >> > On Wed, Jun 14, 2017 at 5:39 PM, Bill Bejeck <bb...@gmail.com>
>> wrote:
>> >> >
>> >> >> Guozhang,
>> >> >>
>> >> >> Thanks for the comments.
>> >> >>
>> >> >> 1.  As for the granularity, I agree that having one global
>> >> >> StateRestoreListener could be restrictive.  But I think it's
>> important
>> >> to
>> >> >> have a "setStateRestoreListener" on KafkaStreams as this allows
>> users
>> >> to
>> >> >> define an anonymous instance that has access to local scope for
>> >> reporting
>> >> >> purposes.  This is a similar pattern we use for
>> >> >> KafkaStreams.setStateListener.
>> >> >>
>> >> >> As an alternative, what if we add a method to the
>> >> BatchingStateRestoreCallback
>> >> >> interface named "getStateStoreListener".   Then in an abstract
>> adapter
>> >> >> class we return null from getStateStoreListener.   But if users
>> want to
>> >> >> supply a different StateRestoreListener strategy per callback they
>> >> would
>> >> >> simply override the method to return an actual instance.
>> >> >>
>> >> >> WDYT?
>> >> >>
>> >> >> 2.  I'll make the required updates to pass in the ending offset at
>> the
>> >> >> start as well as the actual name of the state store.
>> >> >>
>> >> >> Bill
>> >> >>
>> >> >>
>> >> >> On Wed, Jun 14, 2017 at 3:53 PM, Guozhang Wang <wa...@gmail.com>
>> >> wrote:
>> >> >>
>> >> >>> Thanks Bill for the updated wiki. I have a couple of more comments:
>> >> >>>
>> >> >>> 1. Setting StateRestoreListener on the KafkaStreams granularity may
>> >> not be
>> >> >>> sufficient, as in the listener callback we do not which store it is
>> >> >>> restoring right now: if the topic is a changelog topic then from
>> the
>> >> >>> `TopicPartition` we may be able to infer the state store name, but
>> if
>> >> the
>> >> >>> topic is the source topic read as a KTable then we may not know
>> which
>> >> >>> store
>> >> >>> it is restoring right now; plus forcing users to infer the state
>> store
>> >> >>> name
>> >> >>> from the topic partition name would not be intuitive as well. Plus
>> for
>> >> >>> different stores the listener may be implemented differently, and
>> >> setting
>> >> >>> a
>> >> >>> global listener would force users to branch on the topic-partition
>> >> names,
>> >> >>> similarly to what we did in the global timestamp extractor. On the
>> >> other
>> >> >>> hand, I also agree that setting the listener on the per-store
>> >> granularity
>> >> >>> may be a bit cumbersome since if users want to override it on a
>> >> specific
>> >> >>> store it needs to expose some APIs maybe at StateStoreSupplier. So
>> >> would
>> >> >>> love to hear other people's opinions.
>> >> >>>
>> >> >>> If we think that different implemented restoring callback may be
>> less
>> >> >>> common, then I'd suggest at least replace the `TopicPartition`
>> >> parameter
>> >> >>> with the `String` store name and the `TaskId`?
>> >> >>>
>> >> >>> 2. I think we can pass in the `long endOffset` in the
>> `onRestoreStart`
>> >> >>> function as well, as we will have read the endOffset already by
>> then;
>> >> >>> otherwise users can still not be able to track the restoration
>> >> progress
>> >> >>> (e.g. how much percentage I have been restoring so far, to estimate
>> >> on how
>> >> >>> long I still need to wait).
>> >> >>>
>> >> >>>
>> >> >>> Guozhang
>> >> >>>
>> >> >>>
>> >> >>>
>> >> >>> On Wed, Jun 14, 2017 at 12:25 PM, Bill Bejeck <bb...@gmail.com>
>> >> wrote:
>> >> >>>
>> >> >>>> Eno,
>> >> >>>>
>> >> >>>> Thanks for the comments.
>> >> >>>>
>> >> >>>> 1. As for having both restore and restoreAll, I kept the restore
>> >> method
>> >> >>> for
>> >> >>>> backward compatibility as that is what is used by current
>> >> implementing
>> >> >>>> classes. However as I think about it makes things cleaner to have
>> a
>> >> >>> single
>> >> >>>> restore method taking a collection. I'll wait for others to weigh
>> in,
>> >> >>> but
>> >> >>>> I'm leaning towards having a single restore method.
>> >> >>>>
>> >> >>>> 2. The "onBatchRestored" method is for keeping track of the
>> restore
>> >> >>> process
>> >> >>>> as we load records from each poll request.
>> >> >>>>
>> >> >>>>    For example if the change log contained 5000 records and
>> >> >>>> MAX_POLL_RECORDS is set to 1000, the "onBatchRestored" method
>> would
>> >> get
>> >> >>>> called 5 times each time with the ending offset of the last
>> record in
>> >> >>> the
>> >> >>>> batch and the count    of the batch.   I'll update the KIP to add
>> >> >>> comments
>> >> >>>> above the interface methods.
>> >> >>>>
>> >> >>>>
>> >> >>>> Thanks,
>> >> >>>> Bill
>> >> >>>>
>> >> >>>>
>> >> >>>> On Wed, Jun 14, 2017 at 11:49 AM, Eno Thereska <
>> >> eno.thereska@gmail.com>
>> >> >>>> wrote:
>> >> >>>>
>> >> >>>>> Thanks Bill,
>> >> >>>>>
>> >> >>>>> A couple of questions:
>> >> >>>>>
>> >> >>>>>
>> >> >>>> 1. why do we need both restore and restoreAll, why can't we just
>> have
>> >> >>> one,
>> >> >>>>> that takes a collection (i.e., restore all)? Are there cases when
>> >> >>> people
>> >> >>>>> want to restore one at a time? In that case, they could still use
>> >> >>>>> restoreAll with just 1 record in the collection right?
>> >> >>>>>
>> >> >>>>> 2. I don't quite get "onBatchRestored". Could you put a small
>> >> comment
>> >> >>> on
>> >> >>>>> top of all three methods. An example might help here.
>> >> >>>>>
>> >> >>>>> Thanks
>> >> >>>>> Eno
>> >> >>>>>
>> >> >>>>>
>> >> >>>>>> On 8 Jun 2017, at 18:05, Bill Bejeck <bb...@gmail.com> wrote:
>> >> >>>>>>
>> >> >>>>>> Guozhang, Damian thanks for the comments.
>> >> >>>>>>
>> >> >>>>>> Giving developers the ability to hook into StateStore recovery
>> >> >>> phases
>> >> >>>> was
>> >> >>>>>> part of my original intent. However the state the KIP is in now
>> >> >>> won't
>> >> >>>>>> provide this functionality.
>> >> >>>>>>
>> >> >>>>>> As a result I'll be doing a significant revision of KIP-167.
>> I'll
>> >> >>> be
>> >> >>>>> sure
>> >> >>>>>> to incorporate all your comments in the new revision.
>> >> >>>>>>
>> >> >>>>>> Thanks,
>> >> >>>>>> Bill
>> >> >>>>>>
>> >> >>>>>> On Wed, Jun 7, 2017 at 5:24 AM, Damian Guy <
>> damian.guy@gmail.com>
>> >> >>>> wrote:
>> >> >>>>>>
>> >> >>>>>>> I'm largely in agreement with what Guozhang has suggested,
>> i.e.,
>> >> >>>>>>> StateRestoreContext shouldn't have any setters on it and also
>> need
>> >> >>> to
>> >> >>>>> have
>> >> >>>>>>> the end offset available such that people can use it derive
>> >> >>> progress.
>> >> >>>>>>> Slightly different, maybe the StateRestoreContext interface
>> could
>> >> >>> be:
>> >> >>>>>>>
>> >> >>>>>>> long beginOffset()
>> >> >>>>>>> long endOffset()
>> >> >>>>>>> long currentOffset()
>> >> >>>>>>>
>> >> >>>>>>> One further thing, this currently doesn't provide developers
>> the
>> >> >>>>> ability to
>> >> >>>>>>> hook into this information if they are using the built-in
>> >> >>> StateStores.
>> >> >>>>> Is
>> >> >>>>>>> this something we should be considering?
>> >> >>>>>>>
>> >> >>>>>>>
>> >> >>>>>>> On Tue, 6 Jun 2017 at 23:32 Guozhang Wang <wa...@gmail.com>
>> >> >>> wrote:
>> >> >>>>>>>
>> >> >>>>>>>> Thanks for the updated KIP Bill, I have a couple of comments:
>> >> >>>>>>>>
>> >> >>>>>>>> 1) I'm assuming beginRestore / endRestore is called only once
>> per
>> >> >>>> store
>> >> >>>>>>>> throughout the whole restoration process, and restoreAll is
>> >> called
>> >> >>>> per
>> >> >>>>>>>> batch. In that case I feel we can set the StateRestoreContext
>> as
>> >> a
>> >> >>>>> second
>> >> >>>>>>>> parameter in restoreAll and in endRestore as well, and let the
>> >> >>>> library
>> >> >>>>> to
>> >> >>>>>>>> set the corresponding values instead and only let users to
>> read
>> >> >>>> (since
>> >> >>>>>>> the
>> >> >>>>>>>> collection of key-value pairs do not contain offset
>> information
>> >> >>>> anyways
>> >> >>>>>>>> users cannot really set the offset). The "lastOffsetRestored"
>> >> >>> would
>> >> >>>> be
>> >> >>>>>>> the
>> >> >>>>>>>> starting offset when called on `beginRestore`.
>> >> >>>>>>>>
>> >> >>>>>>>> 2) Users who wants to implement their own batch restoration
>> >> >>> callbacks
>> >> >>>>>>> would
>> >> >>>>>>>> now need to implement both `restore` and `restoreAll` while
>> they
>> >> >>>> either
>> >> >>>>>>> let
>> >> >>>>>>>> `restoreAll` to call `restore` or implement the logic in
>> >> >>> `restoreAll`
>> >> >>>>>>> only
>> >> >>>>>>>> and never call `restore`. Maybe we can provide two abstract
>> impl
>> >> >>> of
>> >> >>>>>>>> BatchingStateRestoreCallbacks which does beginRestore /
>> >> >>> endRestore as
>> >> >>>>>>>> no-ops, with one callback implementing `restoreAll` to call
>> >> >>> abstract
>> >> >>>>>>>> `restore` while the other implement `restore` to throw "not
>> >> >>> supported
>> >> >>>>>>>> exception" and keep `restoreAll` abstract.
>> >> >>>>>>>>
>> >> >>>>>>>> 3) I think we can also return the "offset limit" in
>> >> >>>>> StateRestoreContext,
>> >> >>>>>>>> which is important for users to track the restoration progress
>> >> >>> since
>> >> >>>>>>>> otherwise they could not tell how many percent of restoration
>> has
>> >> >>>>>>>> completed.  I.e.:
>> >> >>>>>>>>
>> >> >>>>>>>> public interface BatchingStateRestoreCallback extends
>> >> >>>>>>> StateRestoreCallback
>> >> >>>>>>>> {
>> >> >>>>>>>>
>> >> >>>>>>>>   void restoreAll(Collection<KeyValue<byte[], byte []>>
>> records,
>> >> >>>>>>>> StateRestoreContext
>> >> >>>>>>>> restoreContext);
>> >> >>>>>>>>
>> >> >>>>>>>>   void beginRestore(StateRestoreContext restoreContext);
>> >> >>>>>>>>
>> >> >>>>>>>>   void endRestore(StateRestoreContext restoreContext);
>> >> >>>>>>>> }
>> >> >>>>>>>>
>> >> >>>>>>>> public interface StateRestoreContext {
>> >> >>>>>>>>
>> >> >>>>>>>>  long lastOffsetRestored();
>> >> >>>>>>>>
>> >> >>>>>>>>  long endOffsetToRestore();
>> >> >>>>>>>>
>> >> >>>>>>>>  int numberRestored();
>> >> >>>>>>>> }
>> >> >>>>>>>>
>> >> >>>>>>>>
>> >> >>>>>>>> Guozhang
>> >> >>>>>>>>
>> >> >>>>>>>>
>> >> >>>>>>>>
>> >> >>>>>>>> On Fri, Jun 2, 2017 at 9:16 AM, Bill Bejeck <
>> bbejeck@gmail.com>
>> >> >>>> wrote:
>> >> >>>>>>>>
>> >> >>>>>>>>> Guozhang, Matthias,
>> >> >>>>>>>>>
>> >> >>>>>>>>> Thanks for the comments.  I have updated the KIP, (JIRA title
>> >> and
>> >> >>>>>>>>> description as well).
>> >> >>>>>>>>>
>> >> >>>>>>>>> I had thought about introducing a separate interface
>> altogether,
>> >> >>> but
>> >> >>>>>>>>> extending the current one makes more sense.
>> >> >>>>>>>>>
>> >> >>>>>>>>> As for intermediate callbacks based on time or number of
>> >> >>> records, I
>> >> >>>>>>> think
>> >> >>>>>>>>> the latest update to the KIP addresses this point of querying
>> >> for
>> >> >>>>>>>>> intermediate results, but it would be per batch restored.
>> >> >>>>>>>>>
>> >> >>>>>>>>> Thanks,
>> >> >>>>>>>>> Bill
>> >> >>>>>>>>>
>> >> >>>>>>>>>
>> >> >>>>>>>>>
>> >> >>>>>>>>>
>> >> >>>>>>>>>
>> >> >>>>>>>>> On Fri, Jun 2, 2017 at 8:36 AM, Jim Jagielski <
>> jim@jagunet.com>
>> >> >>>>> wrote:
>> >> >>>>>>>>>
>> >> >>>>>>>>>>
>> >> >>>>>>>>>>> On Jun 2, 2017, at 12:54 AM, Matthias J. Sax <
>> >> >>>>>>> matthias@confluent.io>
>> >> >>>>>>>>>> wrote:
>> >> >>>>>>>>>>>
>> >> >>>>>>>>>>> With regard to backward compatibility, we should not change
>> >> the
>> >> >>>>>>>> current
>> >> >>>>>>>>>>> interface, but add a new interface that extends the current
>> >> >>> one.
>> >> >>>>>>>>>>>
>> >> >>>>>>>>>>
>> >> >>>>>>>>>> ++1
>> >> >>>>>>>>>>
>> >> >>>>>>>>>>
>> >> >>>>>>>>>
>> >> >>>>>>>>
>> >> >>>>>>>>
>> >> >>>>>>>>
>> >> >>>>>>>> --
>> >> >>>>>>>> -- Guozhang
>> >> >>>>>>>>
>> >> >>>>>>>
>> >> >>>>>
>> >> >>>>>
>> >> >>>>
>> >> >>>
>> >> >>>
>> >> >>>
>> >> >>> --
>> >> >>> -- Guozhang
>> >> >>>
>> >> >>
>> >> >>
>> >> >
>> >>
>> >>
>> >
>>
>
>
>
> --
> -- Guozhang
>



-- 
-- Guozhang

Re: [DISCUSS] KIP-167: Add a restoreAll method to StateRestoreCallback

Posted by Guozhang Wang <wa...@gmail.com>.
Bill,

I'm wondering why we need the `StateRestoreNotification` while still having
`StateRestoreListener`, could the above setup achievable just with
`StateRestoreListener.onRestoreStart / onRestoreEnd`? I.e. it seems the
later can subsume any use cases intended for the former API.

Guozhang

On Mon, Jun 19, 2017 at 3:23 PM, Bill Bejeck <bb...@gmail.com> wrote:

> I'm going to update the KIP with new interface StateRestoreNotification
> containing two methods, startRestore and endRestore.
>
> While naming is very similar to methods already proposed on the
> StateRestoreListener, the intent of these methods is not for user
> notification of restore status.  Instead these new methods are for internal
> use by the state store to perform any required setup and teardown work due
> to a batch restoration process.
>
> Here's one current use case: when using RocksDB we should optimize for a
> bulk load by setting Options.prepareForBulkload().
>
>    1. If the database has already been opened, we'll need to close it, set
>    the "prepareForBulkload" and re-open the database.
>    2. Once the restore is completed we'll need to close and re-open the
>    database with the "prepareForBulkload" option turned off.
>
> While we are mentioning the RocksDB use case above, the addition of this
> interface is not specific to any specific implementation of a persistent
> state store.
>
> Additionally, a separate interface is needed so that any user can implement
> the state restore notification feature regardless of the state restore
> callback used.
>
> I'll also remove the "getStateRestoreListener" method and stick with the
> notion of a "global" restore listener for now.
>
> On Mon, Jun 19, 2017 at 1:05 PM, Bill Bejeck <bb...@gmail.com> wrote:
>
> > Yes it is, more of an oversight on my part, I'll remove it from the KIP.
> >
> >
> > On Mon, Jun 19, 2017 at 12:48 PM, Matthias J. Sax <matthias@confluent.io
> >
> > wrote:
> >
> >> Hi,
> >>
> >> I thinks for now it's good enough to start with a single global restore
> >> listener. We can incrementally improve this later on if required. Of
> >> course, if it's easy to do right away we can also be more fine grained.
> >> But for KTable, we might want to add this after getting rid of all the
> >> overloads we have atm.
> >>
> >> One question: what is the purpose of parameter "endOffset" in
> >> #onRestoreEnd() -- isn't this the same value as provided in
> >> #onRestoreStart() ?
> >>
> >>
> >> -Matthias
> >>
> >>
> >>
> >> On 6/15/17 6:18 AM, Bill Bejeck wrote:
> >> > Thinking about the custom StateRestoreListener approach and having a
> get
> >> > method on the interface will really only work for custom state stores.
> >> >
> >> > So we'll need to provide another way for users to set behavior with
> >> > provided state stores.  The only option that comes to mind now is also
> >> > adding a parameter to the StateStoreSupplier.
> >> >
> >> >
> >> > Bill
> >> >
> >> >
> >> > On Wed, Jun 14, 2017 at 5:39 PM, Bill Bejeck <bb...@gmail.com>
> wrote:
> >> >
> >> >> Guozhang,
> >> >>
> >> >> Thanks for the comments.
> >> >>
> >> >> 1.  As for the granularity, I agree that having one global
> >> >> StateRestoreListener could be restrictive.  But I think it's
> important
> >> to
> >> >> have a "setStateRestoreListener" on KafkaStreams as this allows users
> >> to
> >> >> define an anonymous instance that has access to local scope for
> >> reporting
> >> >> purposes.  This is a similar pattern we use for
> >> >> KafkaStreams.setStateListener.
> >> >>
> >> >> As an alternative, what if we add a method to the
> >> BatchingStateRestoreCallback
> >> >> interface named "getStateStoreListener".   Then in an abstract
> adapter
> >> >> class we return null from getStateStoreListener.   But if users want
> to
> >> >> supply a different StateRestoreListener strategy per callback they
> >> would
> >> >> simply override the method to return an actual instance.
> >> >>
> >> >> WDYT?
> >> >>
> >> >> 2.  I'll make the required updates to pass in the ending offset at
> the
> >> >> start as well as the actual name of the state store.
> >> >>
> >> >> Bill
> >> >>
> >> >>
> >> >> On Wed, Jun 14, 2017 at 3:53 PM, Guozhang Wang <wa...@gmail.com>
> >> wrote:
> >> >>
> >> >>> Thanks Bill for the updated wiki. I have a couple of more comments:
> >> >>>
> >> >>> 1. Setting StateRestoreListener on the KafkaStreams granularity may
> >> not be
> >> >>> sufficient, as in the listener callback we do not which store it is
> >> >>> restoring right now: if the topic is a changelog topic then from the
> >> >>> `TopicPartition` we may be able to infer the state store name, but
> if
> >> the
> >> >>> topic is the source topic read as a KTable then we may not know
> which
> >> >>> store
> >> >>> it is restoring right now; plus forcing users to infer the state
> store
> >> >>> name
> >> >>> from the topic partition name would not be intuitive as well. Plus
> for
> >> >>> different stores the listener may be implemented differently, and
> >> setting
> >> >>> a
> >> >>> global listener would force users to branch on the topic-partition
> >> names,
> >> >>> similarly to what we did in the global timestamp extractor. On the
> >> other
> >> >>> hand, I also agree that setting the listener on the per-store
> >> granularity
> >> >>> may be a bit cumbersome since if users want to override it on a
> >> specific
> >> >>> store it needs to expose some APIs maybe at StateStoreSupplier. So
> >> would
> >> >>> love to hear other people's opinions.
> >> >>>
> >> >>> If we think that different implemented restoring callback may be
> less
> >> >>> common, then I'd suggest at least replace the `TopicPartition`
> >> parameter
> >> >>> with the `String` store name and the `TaskId`?
> >> >>>
> >> >>> 2. I think we can pass in the `long endOffset` in the
> `onRestoreStart`
> >> >>> function as well, as we will have read the endOffset already by
> then;
> >> >>> otherwise users can still not be able to track the restoration
> >> progress
> >> >>> (e.g. how much percentage I have been restoring so far, to estimate
> >> on how
> >> >>> long I still need to wait).
> >> >>>
> >> >>>
> >> >>> Guozhang
> >> >>>
> >> >>>
> >> >>>
> >> >>> On Wed, Jun 14, 2017 at 12:25 PM, Bill Bejeck <bb...@gmail.com>
> >> wrote:
> >> >>>
> >> >>>> Eno,
> >> >>>>
> >> >>>> Thanks for the comments.
> >> >>>>
> >> >>>> 1. As for having both restore and restoreAll, I kept the restore
> >> method
> >> >>> for
> >> >>>> backward compatibility as that is what is used by current
> >> implementing
> >> >>>> classes. However as I think about it makes things cleaner to have a
> >> >>> single
> >> >>>> restore method taking a collection. I'll wait for others to weigh
> in,
> >> >>> but
> >> >>>> I'm leaning towards having a single restore method.
> >> >>>>
> >> >>>> 2. The "onBatchRestored" method is for keeping track of the restore
> >> >>> process
> >> >>>> as we load records from each poll request.
> >> >>>>
> >> >>>>    For example if the change log contained 5000 records and
> >> >>>> MAX_POLL_RECORDS is set to 1000, the "onBatchRestored" method would
> >> get
> >> >>>> called 5 times each time with the ending offset of the last record
> in
> >> >>> the
> >> >>>> batch and the count    of the batch.   I'll update the KIP to add
> >> >>> comments
> >> >>>> above the interface methods.
> >> >>>>
> >> >>>>
> >> >>>> Thanks,
> >> >>>> Bill
> >> >>>>
> >> >>>>
> >> >>>> On Wed, Jun 14, 2017 at 11:49 AM, Eno Thereska <
> >> eno.thereska@gmail.com>
> >> >>>> wrote:
> >> >>>>
> >> >>>>> Thanks Bill,
> >> >>>>>
> >> >>>>> A couple of questions:
> >> >>>>>
> >> >>>>>
> >> >>>> 1. why do we need both restore and restoreAll, why can't we just
> have
> >> >>> one,
> >> >>>>> that takes a collection (i.e., restore all)? Are there cases when
> >> >>> people
> >> >>>>> want to restore one at a time? In that case, they could still use
> >> >>>>> restoreAll with just 1 record in the collection right?
> >> >>>>>
> >> >>>>> 2. I don't quite get "onBatchRestored". Could you put a small
> >> comment
> >> >>> on
> >> >>>>> top of all three methods. An example might help here.
> >> >>>>>
> >> >>>>> Thanks
> >> >>>>> Eno
> >> >>>>>
> >> >>>>>
> >> >>>>>> On 8 Jun 2017, at 18:05, Bill Bejeck <bb...@gmail.com> wrote:
> >> >>>>>>
> >> >>>>>> Guozhang, Damian thanks for the comments.
> >> >>>>>>
> >> >>>>>> Giving developers the ability to hook into StateStore recovery
> >> >>> phases
> >> >>>> was
> >> >>>>>> part of my original intent. However the state the KIP is in now
> >> >>> won't
> >> >>>>>> provide this functionality.
> >> >>>>>>
> >> >>>>>> As a result I'll be doing a significant revision of KIP-167.
> I'll
> >> >>> be
> >> >>>>> sure
> >> >>>>>> to incorporate all your comments in the new revision.
> >> >>>>>>
> >> >>>>>> Thanks,
> >> >>>>>> Bill
> >> >>>>>>
> >> >>>>>> On Wed, Jun 7, 2017 at 5:24 AM, Damian Guy <damian.guy@gmail.com
> >
> >> >>>> wrote:
> >> >>>>>>
> >> >>>>>>> I'm largely in agreement with what Guozhang has suggested, i.e.,
> >> >>>>>>> StateRestoreContext shouldn't have any setters on it and also
> need
> >> >>> to
> >> >>>>> have
> >> >>>>>>> the end offset available such that people can use it derive
> >> >>> progress.
> >> >>>>>>> Slightly different, maybe the StateRestoreContext interface
> could
> >> >>> be:
> >> >>>>>>>
> >> >>>>>>> long beginOffset()
> >> >>>>>>> long endOffset()
> >> >>>>>>> long currentOffset()
> >> >>>>>>>
> >> >>>>>>> One further thing, this currently doesn't provide developers the
> >> >>>>> ability to
> >> >>>>>>> hook into this information if they are using the built-in
> >> >>> StateStores.
> >> >>>>> Is
> >> >>>>>>> this something we should be considering?
> >> >>>>>>>
> >> >>>>>>>
> >> >>>>>>> On Tue, 6 Jun 2017 at 23:32 Guozhang Wang <wa...@gmail.com>
> >> >>> wrote:
> >> >>>>>>>
> >> >>>>>>>> Thanks for the updated KIP Bill, I have a couple of comments:
> >> >>>>>>>>
> >> >>>>>>>> 1) I'm assuming beginRestore / endRestore is called only once
> per
> >> >>>> store
> >> >>>>>>>> throughout the whole restoration process, and restoreAll is
> >> called
> >> >>>> per
> >> >>>>>>>> batch. In that case I feel we can set the StateRestoreContext
> as
> >> a
> >> >>>>> second
> >> >>>>>>>> parameter in restoreAll and in endRestore as well, and let the
> >> >>>> library
> >> >>>>> to
> >> >>>>>>>> set the corresponding values instead and only let users to read
> >> >>>> (since
> >> >>>>>>> the
> >> >>>>>>>> collection of key-value pairs do not contain offset information
> >> >>>> anyways
> >> >>>>>>>> users cannot really set the offset). The "lastOffsetRestored"
> >> >>> would
> >> >>>> be
> >> >>>>>>> the
> >> >>>>>>>> starting offset when called on `beginRestore`.
> >> >>>>>>>>
> >> >>>>>>>> 2) Users who wants to implement their own batch restoration
> >> >>> callbacks
> >> >>>>>>> would
> >> >>>>>>>> now need to implement both `restore` and `restoreAll` while
> they
> >> >>>> either
> >> >>>>>>> let
> >> >>>>>>>> `restoreAll` to call `restore` or implement the logic in
> >> >>> `restoreAll`
> >> >>>>>>> only
> >> >>>>>>>> and never call `restore`. Maybe we can provide two abstract
> impl
> >> >>> of
> >> >>>>>>>> BatchingStateRestoreCallbacks which does beginRestore /
> >> >>> endRestore as
> >> >>>>>>>> no-ops, with one callback implementing `restoreAll` to call
> >> >>> abstract
> >> >>>>>>>> `restore` while the other implement `restore` to throw "not
> >> >>> supported
> >> >>>>>>>> exception" and keep `restoreAll` abstract.
> >> >>>>>>>>
> >> >>>>>>>> 3) I think we can also return the "offset limit" in
> >> >>>>> StateRestoreContext,
> >> >>>>>>>> which is important for users to track the restoration progress
> >> >>> since
> >> >>>>>>>> otherwise they could not tell how many percent of restoration
> has
> >> >>>>>>>> completed.  I.e.:
> >> >>>>>>>>
> >> >>>>>>>> public interface BatchingStateRestoreCallback extends
> >> >>>>>>> StateRestoreCallback
> >> >>>>>>>> {
> >> >>>>>>>>
> >> >>>>>>>>   void restoreAll(Collection<KeyValue<byte[], byte []>>
> records,
> >> >>>>>>>> StateRestoreContext
> >> >>>>>>>> restoreContext);
> >> >>>>>>>>
> >> >>>>>>>>   void beginRestore(StateRestoreContext restoreContext);
> >> >>>>>>>>
> >> >>>>>>>>   void endRestore(StateRestoreContext restoreContext);
> >> >>>>>>>> }
> >> >>>>>>>>
> >> >>>>>>>> public interface StateRestoreContext {
> >> >>>>>>>>
> >> >>>>>>>>  long lastOffsetRestored();
> >> >>>>>>>>
> >> >>>>>>>>  long endOffsetToRestore();
> >> >>>>>>>>
> >> >>>>>>>>  int numberRestored();
> >> >>>>>>>> }
> >> >>>>>>>>
> >> >>>>>>>>
> >> >>>>>>>> Guozhang
> >> >>>>>>>>
> >> >>>>>>>>
> >> >>>>>>>>
> >> >>>>>>>> On Fri, Jun 2, 2017 at 9:16 AM, Bill Bejeck <bbejeck@gmail.com
> >
> >> >>>> wrote:
> >> >>>>>>>>
> >> >>>>>>>>> Guozhang, Matthias,
> >> >>>>>>>>>
> >> >>>>>>>>> Thanks for the comments.  I have updated the KIP, (JIRA title
> >> and
> >> >>>>>>>>> description as well).
> >> >>>>>>>>>
> >> >>>>>>>>> I had thought about introducing a separate interface
> altogether,
> >> >>> but
> >> >>>>>>>>> extending the current one makes more sense.
> >> >>>>>>>>>
> >> >>>>>>>>> As for intermediate callbacks based on time or number of
> >> >>> records, I
> >> >>>>>>> think
> >> >>>>>>>>> the latest update to the KIP addresses this point of querying
> >> for
> >> >>>>>>>>> intermediate results, but it would be per batch restored.
> >> >>>>>>>>>
> >> >>>>>>>>> Thanks,
> >> >>>>>>>>> Bill
> >> >>>>>>>>>
> >> >>>>>>>>>
> >> >>>>>>>>>
> >> >>>>>>>>>
> >> >>>>>>>>>
> >> >>>>>>>>> On Fri, Jun 2, 2017 at 8:36 AM, Jim Jagielski <
> jim@jagunet.com>
> >> >>>>> wrote:
> >> >>>>>>>>>
> >> >>>>>>>>>>
> >> >>>>>>>>>>> On Jun 2, 2017, at 12:54 AM, Matthias J. Sax <
> >> >>>>>>> matthias@confluent.io>
> >> >>>>>>>>>> wrote:
> >> >>>>>>>>>>>
> >> >>>>>>>>>>> With regard to backward compatibility, we should not change
> >> the
> >> >>>>>>>> current
> >> >>>>>>>>>>> interface, but add a new interface that extends the current
> >> >>> one.
> >> >>>>>>>>>>>
> >> >>>>>>>>>>
> >> >>>>>>>>>> ++1
> >> >>>>>>>>>>
> >> >>>>>>>>>>
> >> >>>>>>>>>
> >> >>>>>>>>
> >> >>>>>>>>
> >> >>>>>>>>
> >> >>>>>>>> --
> >> >>>>>>>> -- Guozhang
> >> >>>>>>>>
> >> >>>>>>>
> >> >>>>>
> >> >>>>>
> >> >>>>
> >> >>>
> >> >>>
> >> >>>
> >> >>> --
> >> >>> -- Guozhang
> >> >>>
> >> >>
> >> >>
> >> >
> >>
> >>
> >
>



-- 
-- Guozhang

Re: [DISCUSS] KIP-167: Add a restoreAll method to StateRestoreCallback

Posted by Bill Bejeck <bb...@gmail.com>.
I'm going to update the KIP with new interface StateRestoreNotification
containing two methods, startRestore and endRestore.

While naming is very similar to methods already proposed on the
StateRestoreListener, the intent of these methods is not for user
notification of restore status.  Instead these new methods are for internal
use by the state store to perform any required setup and teardown work due
to a batch restoration process.

Here's one current use case: when using RocksDB we should optimize for a
bulk load by setting Options.prepareForBulkload().

   1. If the database has already been opened, we'll need to close it, set
   the "prepareForBulkload" and re-open the database.
   2. Once the restore is completed we'll need to close and re-open the
   database with the "prepareForBulkload" option turned off.

While we are mentioning the RocksDB use case above, the addition of this
interface is not specific to any specific implementation of a persistent
state store.

Additionally, a separate interface is needed so that any user can implement
the state restore notification feature regardless of the state restore
callback used.

I'll also remove the "getStateRestoreListener" method and stick with the
notion of a "global" restore listener for now.

On Mon, Jun 19, 2017 at 1:05 PM, Bill Bejeck <bb...@gmail.com> wrote:

> Yes it is, more of an oversight on my part, I'll remove it from the KIP.
>
>
> On Mon, Jun 19, 2017 at 12:48 PM, Matthias J. Sax <ma...@confluent.io>
> wrote:
>
>> Hi,
>>
>> I thinks for now it's good enough to start with a single global restore
>> listener. We can incrementally improve this later on if required. Of
>> course, if it's easy to do right away we can also be more fine grained.
>> But for KTable, we might want to add this after getting rid of all the
>> overloads we have atm.
>>
>> One question: what is the purpose of parameter "endOffset" in
>> #onRestoreEnd() -- isn't this the same value as provided in
>> #onRestoreStart() ?
>>
>>
>> -Matthias
>>
>>
>>
>> On 6/15/17 6:18 AM, Bill Bejeck wrote:
>> > Thinking about the custom StateRestoreListener approach and having a get
>> > method on the interface will really only work for custom state stores.
>> >
>> > So we'll need to provide another way for users to set behavior with
>> > provided state stores.  The only option that comes to mind now is also
>> > adding a parameter to the StateStoreSupplier.
>> >
>> >
>> > Bill
>> >
>> >
>> > On Wed, Jun 14, 2017 at 5:39 PM, Bill Bejeck <bb...@gmail.com> wrote:
>> >
>> >> Guozhang,
>> >>
>> >> Thanks for the comments.
>> >>
>> >> 1.  As for the granularity, I agree that having one global
>> >> StateRestoreListener could be restrictive.  But I think it's important
>> to
>> >> have a "setStateRestoreListener" on KafkaStreams as this allows users
>> to
>> >> define an anonymous instance that has access to local scope for
>> reporting
>> >> purposes.  This is a similar pattern we use for
>> >> KafkaStreams.setStateListener.
>> >>
>> >> As an alternative, what if we add a method to the
>> BatchingStateRestoreCallback
>> >> interface named "getStateStoreListener".   Then in an abstract adapter
>> >> class we return null from getStateStoreListener.   But if users want to
>> >> supply a different StateRestoreListener strategy per callback they
>> would
>> >> simply override the method to return an actual instance.
>> >>
>> >> WDYT?
>> >>
>> >> 2.  I'll make the required updates to pass in the ending offset at the
>> >> start as well as the actual name of the state store.
>> >>
>> >> Bill
>> >>
>> >>
>> >> On Wed, Jun 14, 2017 at 3:53 PM, Guozhang Wang <wa...@gmail.com>
>> wrote:
>> >>
>> >>> Thanks Bill for the updated wiki. I have a couple of more comments:
>> >>>
>> >>> 1. Setting StateRestoreListener on the KafkaStreams granularity may
>> not be
>> >>> sufficient, as in the listener callback we do not which store it is
>> >>> restoring right now: if the topic is a changelog topic then from the
>> >>> `TopicPartition` we may be able to infer the state store name, but if
>> the
>> >>> topic is the source topic read as a KTable then we may not know which
>> >>> store
>> >>> it is restoring right now; plus forcing users to infer the state store
>> >>> name
>> >>> from the topic partition name would not be intuitive as well. Plus for
>> >>> different stores the listener may be implemented differently, and
>> setting
>> >>> a
>> >>> global listener would force users to branch on the topic-partition
>> names,
>> >>> similarly to what we did in the global timestamp extractor. On the
>> other
>> >>> hand, I also agree that setting the listener on the per-store
>> granularity
>> >>> may be a bit cumbersome since if users want to override it on a
>> specific
>> >>> store it needs to expose some APIs maybe at StateStoreSupplier. So
>> would
>> >>> love to hear other people's opinions.
>> >>>
>> >>> If we think that different implemented restoring callback may be less
>> >>> common, then I'd suggest at least replace the `TopicPartition`
>> parameter
>> >>> with the `String` store name and the `TaskId`?
>> >>>
>> >>> 2. I think we can pass in the `long endOffset` in the `onRestoreStart`
>> >>> function as well, as we will have read the endOffset already by then;
>> >>> otherwise users can still not be able to track the restoration
>> progress
>> >>> (e.g. how much percentage I have been restoring so far, to estimate
>> on how
>> >>> long I still need to wait).
>> >>>
>> >>>
>> >>> Guozhang
>> >>>
>> >>>
>> >>>
>> >>> On Wed, Jun 14, 2017 at 12:25 PM, Bill Bejeck <bb...@gmail.com>
>> wrote:
>> >>>
>> >>>> Eno,
>> >>>>
>> >>>> Thanks for the comments.
>> >>>>
>> >>>> 1. As for having both restore and restoreAll, I kept the restore
>> method
>> >>> for
>> >>>> backward compatibility as that is what is used by current
>> implementing
>> >>>> classes. However as I think about it makes things cleaner to have a
>> >>> single
>> >>>> restore method taking a collection. I'll wait for others to weigh in,
>> >>> but
>> >>>> I'm leaning towards having a single restore method.
>> >>>>
>> >>>> 2. The "onBatchRestored" method is for keeping track of the restore
>> >>> process
>> >>>> as we load records from each poll request.
>> >>>>
>> >>>>    For example if the change log contained 5000 records and
>> >>>> MAX_POLL_RECORDS is set to 1000, the "onBatchRestored" method would
>> get
>> >>>> called 5 times each time with the ending offset of the last record in
>> >>> the
>> >>>> batch and the count    of the batch.   I'll update the KIP to add
>> >>> comments
>> >>>> above the interface methods.
>> >>>>
>> >>>>
>> >>>> Thanks,
>> >>>> Bill
>> >>>>
>> >>>>
>> >>>> On Wed, Jun 14, 2017 at 11:49 AM, Eno Thereska <
>> eno.thereska@gmail.com>
>> >>>> wrote:
>> >>>>
>> >>>>> Thanks Bill,
>> >>>>>
>> >>>>> A couple of questions:
>> >>>>>
>> >>>>>
>> >>>> 1. why do we need both restore and restoreAll, why can't we just have
>> >>> one,
>> >>>>> that takes a collection (i.e., restore all)? Are there cases when
>> >>> people
>> >>>>> want to restore one at a time? In that case, they could still use
>> >>>>> restoreAll with just 1 record in the collection right?
>> >>>>>
>> >>>>> 2. I don't quite get "onBatchRestored". Could you put a small
>> comment
>> >>> on
>> >>>>> top of all three methods. An example might help here.
>> >>>>>
>> >>>>> Thanks
>> >>>>> Eno
>> >>>>>
>> >>>>>
>> >>>>>> On 8 Jun 2017, at 18:05, Bill Bejeck <bb...@gmail.com> wrote:
>> >>>>>>
>> >>>>>> Guozhang, Damian thanks for the comments.
>> >>>>>>
>> >>>>>> Giving developers the ability to hook into StateStore recovery
>> >>> phases
>> >>>> was
>> >>>>>> part of my original intent. However the state the KIP is in now
>> >>> won't
>> >>>>>> provide this functionality.
>> >>>>>>
>> >>>>>> As a result I'll be doing a significant revision of KIP-167.  I'll
>> >>> be
>> >>>>> sure
>> >>>>>> to incorporate all your comments in the new revision.
>> >>>>>>
>> >>>>>> Thanks,
>> >>>>>> Bill
>> >>>>>>
>> >>>>>> On Wed, Jun 7, 2017 at 5:24 AM, Damian Guy <da...@gmail.com>
>> >>>> wrote:
>> >>>>>>
>> >>>>>>> I'm largely in agreement with what Guozhang has suggested, i.e.,
>> >>>>>>> StateRestoreContext shouldn't have any setters on it and also need
>> >>> to
>> >>>>> have
>> >>>>>>> the end offset available such that people can use it derive
>> >>> progress.
>> >>>>>>> Slightly different, maybe the StateRestoreContext interface could
>> >>> be:
>> >>>>>>>
>> >>>>>>> long beginOffset()
>> >>>>>>> long endOffset()
>> >>>>>>> long currentOffset()
>> >>>>>>>
>> >>>>>>> One further thing, this currently doesn't provide developers the
>> >>>>> ability to
>> >>>>>>> hook into this information if they are using the built-in
>> >>> StateStores.
>> >>>>> Is
>> >>>>>>> this something we should be considering?
>> >>>>>>>
>> >>>>>>>
>> >>>>>>> On Tue, 6 Jun 2017 at 23:32 Guozhang Wang <wa...@gmail.com>
>> >>> wrote:
>> >>>>>>>
>> >>>>>>>> Thanks for the updated KIP Bill, I have a couple of comments:
>> >>>>>>>>
>> >>>>>>>> 1) I'm assuming beginRestore / endRestore is called only once per
>> >>>> store
>> >>>>>>>> throughout the whole restoration process, and restoreAll is
>> called
>> >>>> per
>> >>>>>>>> batch. In that case I feel we can set the StateRestoreContext as
>> a
>> >>>>> second
>> >>>>>>>> parameter in restoreAll and in endRestore as well, and let the
>> >>>> library
>> >>>>> to
>> >>>>>>>> set the corresponding values instead and only let users to read
>> >>>> (since
>> >>>>>>> the
>> >>>>>>>> collection of key-value pairs do not contain offset information
>> >>>> anyways
>> >>>>>>>> users cannot really set the offset). The "lastOffsetRestored"
>> >>> would
>> >>>> be
>> >>>>>>> the
>> >>>>>>>> starting offset when called on `beginRestore`.
>> >>>>>>>>
>> >>>>>>>> 2) Users who wants to implement their own batch restoration
>> >>> callbacks
>> >>>>>>> would
>> >>>>>>>> now need to implement both `restore` and `restoreAll` while they
>> >>>> either
>> >>>>>>> let
>> >>>>>>>> `restoreAll` to call `restore` or implement the logic in
>> >>> `restoreAll`
>> >>>>>>> only
>> >>>>>>>> and never call `restore`. Maybe we can provide two abstract impl
>> >>> of
>> >>>>>>>> BatchingStateRestoreCallbacks which does beginRestore /
>> >>> endRestore as
>> >>>>>>>> no-ops, with one callback implementing `restoreAll` to call
>> >>> abstract
>> >>>>>>>> `restore` while the other implement `restore` to throw "not
>> >>> supported
>> >>>>>>>> exception" and keep `restoreAll` abstract.
>> >>>>>>>>
>> >>>>>>>> 3) I think we can also return the "offset limit" in
>> >>>>> StateRestoreContext,
>> >>>>>>>> which is important for users to track the restoration progress
>> >>> since
>> >>>>>>>> otherwise they could not tell how many percent of restoration has
>> >>>>>>>> completed.  I.e.:
>> >>>>>>>>
>> >>>>>>>> public interface BatchingStateRestoreCallback extends
>> >>>>>>> StateRestoreCallback
>> >>>>>>>> {
>> >>>>>>>>
>> >>>>>>>>   void restoreAll(Collection<KeyValue<byte[], byte []>> records,
>> >>>>>>>> StateRestoreContext
>> >>>>>>>> restoreContext);
>> >>>>>>>>
>> >>>>>>>>   void beginRestore(StateRestoreContext restoreContext);
>> >>>>>>>>
>> >>>>>>>>   void endRestore(StateRestoreContext restoreContext);
>> >>>>>>>> }
>> >>>>>>>>
>> >>>>>>>> public interface StateRestoreContext {
>> >>>>>>>>
>> >>>>>>>>  long lastOffsetRestored();
>> >>>>>>>>
>> >>>>>>>>  long endOffsetToRestore();
>> >>>>>>>>
>> >>>>>>>>  int numberRestored();
>> >>>>>>>> }
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>> Guozhang
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>> On Fri, Jun 2, 2017 at 9:16 AM, Bill Bejeck <bb...@gmail.com>
>> >>>> wrote:
>> >>>>>>>>
>> >>>>>>>>> Guozhang, Matthias,
>> >>>>>>>>>
>> >>>>>>>>> Thanks for the comments.  I have updated the KIP, (JIRA title
>> and
>> >>>>>>>>> description as well).
>> >>>>>>>>>
>> >>>>>>>>> I had thought about introducing a separate interface altogether,
>> >>> but
>> >>>>>>>>> extending the current one makes more sense.
>> >>>>>>>>>
>> >>>>>>>>> As for intermediate callbacks based on time or number of
>> >>> records, I
>> >>>>>>> think
>> >>>>>>>>> the latest update to the KIP addresses this point of querying
>> for
>> >>>>>>>>> intermediate results, but it would be per batch restored.
>> >>>>>>>>>
>> >>>>>>>>> Thanks,
>> >>>>>>>>> Bill
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>> On Fri, Jun 2, 2017 at 8:36 AM, Jim Jagielski <ji...@jagunet.com>
>> >>>>> wrote:
>> >>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>> On Jun 2, 2017, at 12:54 AM, Matthias J. Sax <
>> >>>>>>> matthias@confluent.io>
>> >>>>>>>>>> wrote:
>> >>>>>>>>>>>
>> >>>>>>>>>>> With regard to backward compatibility, we should not change
>> the
>> >>>>>>>> current
>> >>>>>>>>>>> interface, but add a new interface that extends the current
>> >>> one.
>> >>>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>> ++1
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>> --
>> >>>>>>>> -- Guozhang
>> >>>>>>>>
>> >>>>>>>
>> >>>>>
>> >>>>>
>> >>>>
>> >>>
>> >>>
>> >>>
>> >>> --
>> >>> -- Guozhang
>> >>>
>> >>
>> >>
>> >
>>
>>
>

Re: [DISCUSS] KIP-167: Add a restoreAll method to StateRestoreCallback

Posted by Bill Bejeck <bb...@gmail.com>.
Yes it is, more of an oversight on my part, I'll remove it from the KIP.


On Mon, Jun 19, 2017 at 12:48 PM, Matthias J. Sax <ma...@confluent.io>
wrote:

> Hi,
>
> I thinks for now it's good enough to start with a single global restore
> listener. We can incrementally improve this later on if required. Of
> course, if it's easy to do right away we can also be more fine grained.
> But for KTable, we might want to add this after getting rid of all the
> overloads we have atm.
>
> One question: what is the purpose of parameter "endOffset" in
> #onRestoreEnd() -- isn't this the same value as provided in
> #onRestoreStart() ?
>
>
> -Matthias
>
>
>
> On 6/15/17 6:18 AM, Bill Bejeck wrote:
> > Thinking about the custom StateRestoreListener approach and having a get
> > method on the interface will really only work for custom state stores.
> >
> > So we'll need to provide another way for users to set behavior with
> > provided state stores.  The only option that comes to mind now is also
> > adding a parameter to the StateStoreSupplier.
> >
> >
> > Bill
> >
> >
> > On Wed, Jun 14, 2017 at 5:39 PM, Bill Bejeck <bb...@gmail.com> wrote:
> >
> >> Guozhang,
> >>
> >> Thanks for the comments.
> >>
> >> 1.  As for the granularity, I agree that having one global
> >> StateRestoreListener could be restrictive.  But I think it's important
> to
> >> have a "setStateRestoreListener" on KafkaStreams as this allows users to
> >> define an anonymous instance that has access to local scope for
> reporting
> >> purposes.  This is a similar pattern we use for
> >> KafkaStreams.setStateListener.
> >>
> >> As an alternative, what if we add a method to the
> BatchingStateRestoreCallback
> >> interface named "getStateStoreListener".   Then in an abstract adapter
> >> class we return null from getStateStoreListener.   But if users want to
> >> supply a different StateRestoreListener strategy per callback they would
> >> simply override the method to return an actual instance.
> >>
> >> WDYT?
> >>
> >> 2.  I'll make the required updates to pass in the ending offset at the
> >> start as well as the actual name of the state store.
> >>
> >> Bill
> >>
> >>
> >> On Wed, Jun 14, 2017 at 3:53 PM, Guozhang Wang <wa...@gmail.com>
> wrote:
> >>
> >>> Thanks Bill for the updated wiki. I have a couple of more comments:
> >>>
> >>> 1. Setting StateRestoreListener on the KafkaStreams granularity may
> not be
> >>> sufficient, as in the listener callback we do not which store it is
> >>> restoring right now: if the topic is a changelog topic then from the
> >>> `TopicPartition` we may be able to infer the state store name, but if
> the
> >>> topic is the source topic read as a KTable then we may not know which
> >>> store
> >>> it is restoring right now; plus forcing users to infer the state store
> >>> name
> >>> from the topic partition name would not be intuitive as well. Plus for
> >>> different stores the listener may be implemented differently, and
> setting
> >>> a
> >>> global listener would force users to branch on the topic-partition
> names,
> >>> similarly to what we did in the global timestamp extractor. On the
> other
> >>> hand, I also agree that setting the listener on the per-store
> granularity
> >>> may be a bit cumbersome since if users want to override it on a
> specific
> >>> store it needs to expose some APIs maybe at StateStoreSupplier. So
> would
> >>> love to hear other people's opinions.
> >>>
> >>> If we think that different implemented restoring callback may be less
> >>> common, then I'd suggest at least replace the `TopicPartition`
> parameter
> >>> with the `String` store name and the `TaskId`?
> >>>
> >>> 2. I think we can pass in the `long endOffset` in the `onRestoreStart`
> >>> function as well, as we will have read the endOffset already by then;
> >>> otherwise users can still not be able to track the restoration progress
> >>> (e.g. how much percentage I have been restoring so far, to estimate on
> how
> >>> long I still need to wait).
> >>>
> >>>
> >>> Guozhang
> >>>
> >>>
> >>>
> >>> On Wed, Jun 14, 2017 at 12:25 PM, Bill Bejeck <bb...@gmail.com>
> wrote:
> >>>
> >>>> Eno,
> >>>>
> >>>> Thanks for the comments.
> >>>>
> >>>> 1. As for having both restore and restoreAll, I kept the restore
> method
> >>> for
> >>>> backward compatibility as that is what is used by current implementing
> >>>> classes. However as I think about it makes things cleaner to have a
> >>> single
> >>>> restore method taking a collection. I'll wait for others to weigh in,
> >>> but
> >>>> I'm leaning towards having a single restore method.
> >>>>
> >>>> 2. The "onBatchRestored" method is for keeping track of the restore
> >>> process
> >>>> as we load records from each poll request.
> >>>>
> >>>>    For example if the change log contained 5000 records and
> >>>> MAX_POLL_RECORDS is set to 1000, the "onBatchRestored" method would
> get
> >>>> called 5 times each time with the ending offset of the last record in
> >>> the
> >>>> batch and the count    of the batch.   I'll update the KIP to add
> >>> comments
> >>>> above the interface methods.
> >>>>
> >>>>
> >>>> Thanks,
> >>>> Bill
> >>>>
> >>>>
> >>>> On Wed, Jun 14, 2017 at 11:49 AM, Eno Thereska <
> eno.thereska@gmail.com>
> >>>> wrote:
> >>>>
> >>>>> Thanks Bill,
> >>>>>
> >>>>> A couple of questions:
> >>>>>
> >>>>>
> >>>> 1. why do we need both restore and restoreAll, why can't we just have
> >>> one,
> >>>>> that takes a collection (i.e., restore all)? Are there cases when
> >>> people
> >>>>> want to restore one at a time? In that case, they could still use
> >>>>> restoreAll with just 1 record in the collection right?
> >>>>>
> >>>>> 2. I don't quite get "onBatchRestored". Could you put a small comment
> >>> on
> >>>>> top of all three methods. An example might help here.
> >>>>>
> >>>>> Thanks
> >>>>> Eno
> >>>>>
> >>>>>
> >>>>>> On 8 Jun 2017, at 18:05, Bill Bejeck <bb...@gmail.com> wrote:
> >>>>>>
> >>>>>> Guozhang, Damian thanks for the comments.
> >>>>>>
> >>>>>> Giving developers the ability to hook into StateStore recovery
> >>> phases
> >>>> was
> >>>>>> part of my original intent. However the state the KIP is in now
> >>> won't
> >>>>>> provide this functionality.
> >>>>>>
> >>>>>> As a result I'll be doing a significant revision of KIP-167.  I'll
> >>> be
> >>>>> sure
> >>>>>> to incorporate all your comments in the new revision.
> >>>>>>
> >>>>>> Thanks,
> >>>>>> Bill
> >>>>>>
> >>>>>> On Wed, Jun 7, 2017 at 5:24 AM, Damian Guy <da...@gmail.com>
> >>>> wrote:
> >>>>>>
> >>>>>>> I'm largely in agreement with what Guozhang has suggested, i.e.,
> >>>>>>> StateRestoreContext shouldn't have any setters on it and also need
> >>> to
> >>>>> have
> >>>>>>> the end offset available such that people can use it derive
> >>> progress.
> >>>>>>> Slightly different, maybe the StateRestoreContext interface could
> >>> be:
> >>>>>>>
> >>>>>>> long beginOffset()
> >>>>>>> long endOffset()
> >>>>>>> long currentOffset()
> >>>>>>>
> >>>>>>> One further thing, this currently doesn't provide developers the
> >>>>> ability to
> >>>>>>> hook into this information if they are using the built-in
> >>> StateStores.
> >>>>> Is
> >>>>>>> this something we should be considering?
> >>>>>>>
> >>>>>>>
> >>>>>>> On Tue, 6 Jun 2017 at 23:32 Guozhang Wang <wa...@gmail.com>
> >>> wrote:
> >>>>>>>
> >>>>>>>> Thanks for the updated KIP Bill, I have a couple of comments:
> >>>>>>>>
> >>>>>>>> 1) I'm assuming beginRestore / endRestore is called only once per
> >>>> store
> >>>>>>>> throughout the whole restoration process, and restoreAll is called
> >>>> per
> >>>>>>>> batch. In that case I feel we can set the StateRestoreContext as a
> >>>>> second
> >>>>>>>> parameter in restoreAll and in endRestore as well, and let the
> >>>> library
> >>>>> to
> >>>>>>>> set the corresponding values instead and only let users to read
> >>>> (since
> >>>>>>> the
> >>>>>>>> collection of key-value pairs do not contain offset information
> >>>> anyways
> >>>>>>>> users cannot really set the offset). The "lastOffsetRestored"
> >>> would
> >>>> be
> >>>>>>> the
> >>>>>>>> starting offset when called on `beginRestore`.
> >>>>>>>>
> >>>>>>>> 2) Users who wants to implement their own batch restoration
> >>> callbacks
> >>>>>>> would
> >>>>>>>> now need to implement both `restore` and `restoreAll` while they
> >>>> either
> >>>>>>> let
> >>>>>>>> `restoreAll` to call `restore` or implement the logic in
> >>> `restoreAll`
> >>>>>>> only
> >>>>>>>> and never call `restore`. Maybe we can provide two abstract impl
> >>> of
> >>>>>>>> BatchingStateRestoreCallbacks which does beginRestore /
> >>> endRestore as
> >>>>>>>> no-ops, with one callback implementing `restoreAll` to call
> >>> abstract
> >>>>>>>> `restore` while the other implement `restore` to throw "not
> >>> supported
> >>>>>>>> exception" and keep `restoreAll` abstract.
> >>>>>>>>
> >>>>>>>> 3) I think we can also return the "offset limit" in
> >>>>> StateRestoreContext,
> >>>>>>>> which is important for users to track the restoration progress
> >>> since
> >>>>>>>> otherwise they could not tell how many percent of restoration has
> >>>>>>>> completed.  I.e.:
> >>>>>>>>
> >>>>>>>> public interface BatchingStateRestoreCallback extends
> >>>>>>> StateRestoreCallback
> >>>>>>>> {
> >>>>>>>>
> >>>>>>>>   void restoreAll(Collection<KeyValue<byte[], byte []>> records,
> >>>>>>>> StateRestoreContext
> >>>>>>>> restoreContext);
> >>>>>>>>
> >>>>>>>>   void beginRestore(StateRestoreContext restoreContext);
> >>>>>>>>
> >>>>>>>>   void endRestore(StateRestoreContext restoreContext);
> >>>>>>>> }
> >>>>>>>>
> >>>>>>>> public interface StateRestoreContext {
> >>>>>>>>
> >>>>>>>>  long lastOffsetRestored();
> >>>>>>>>
> >>>>>>>>  long endOffsetToRestore();
> >>>>>>>>
> >>>>>>>>  int numberRestored();
> >>>>>>>> }
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> Guozhang
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On Fri, Jun 2, 2017 at 9:16 AM, Bill Bejeck <bb...@gmail.com>
> >>>> wrote:
> >>>>>>>>
> >>>>>>>>> Guozhang, Matthias,
> >>>>>>>>>
> >>>>>>>>> Thanks for the comments.  I have updated the KIP, (JIRA title and
> >>>>>>>>> description as well).
> >>>>>>>>>
> >>>>>>>>> I had thought about introducing a separate interface altogether,
> >>> but
> >>>>>>>>> extending the current one makes more sense.
> >>>>>>>>>
> >>>>>>>>> As for intermediate callbacks based on time or number of
> >>> records, I
> >>>>>>> think
> >>>>>>>>> the latest update to the KIP addresses this point of querying for
> >>>>>>>>> intermediate results, but it would be per batch restored.
> >>>>>>>>>
> >>>>>>>>> Thanks,
> >>>>>>>>> Bill
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On Fri, Jun 2, 2017 at 8:36 AM, Jim Jagielski <ji...@jagunet.com>
> >>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>> On Jun 2, 2017, at 12:54 AM, Matthias J. Sax <
> >>>>>>> matthias@confluent.io>
> >>>>>>>>>> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>> With regard to backward compatibility, we should not change the
> >>>>>>>> current
> >>>>>>>>>>> interface, but add a new interface that extends the current
> >>> one.
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> ++1
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> --
> >>>>>>>> -- Guozhang
> >>>>>>>>
> >>>>>>>
> >>>>>
> >>>>>
> >>>>
> >>>
> >>>
> >>>
> >>> --
> >>> -- Guozhang
> >>>
> >>
> >>
> >
>
>

Re: [DISCUSS] KIP-167: Add a restoreAll method to StateRestoreCallback

Posted by "Matthias J. Sax" <ma...@confluent.io>.
Hi,

I thinks for now it's good enough to start with a single global restore
listener. We can incrementally improve this later on if required. Of
course, if it's easy to do right away we can also be more fine grained.
But for KTable, we might want to add this after getting rid of all the
overloads we have atm.

One question: what is the purpose of parameter "endOffset" in
#onRestoreEnd() -- isn't this the same value as provided in
#onRestoreStart() ?


-Matthias



On 6/15/17 6:18 AM, Bill Bejeck wrote:
> Thinking about the custom StateRestoreListener approach and having a get
> method on the interface will really only work for custom state stores.
> 
> So we'll need to provide another way for users to set behavior with
> provided state stores.  The only option that comes to mind now is also
> adding a parameter to the StateStoreSupplier.
> 
> 
> Bill
> 
> 
> On Wed, Jun 14, 2017 at 5:39 PM, Bill Bejeck <bb...@gmail.com> wrote:
> 
>> Guozhang,
>>
>> Thanks for the comments.
>>
>> 1.  As for the granularity, I agree that having one global
>> StateRestoreListener could be restrictive.  But I think it's important to
>> have a "setStateRestoreListener" on KafkaStreams as this allows users to
>> define an anonymous instance that has access to local scope for reporting
>> purposes.  This is a similar pattern we use for
>> KafkaStreams.setStateListener.
>>
>> As an alternative, what if we add a method to the BatchingStateRestoreCallback
>> interface named "getStateStoreListener".   Then in an abstract adapter
>> class we return null from getStateStoreListener.   But if users want to
>> supply a different StateRestoreListener strategy per callback they would
>> simply override the method to return an actual instance.
>>
>> WDYT?
>>
>> 2.  I'll make the required updates to pass in the ending offset at the
>> start as well as the actual name of the state store.
>>
>> Bill
>>
>>
>> On Wed, Jun 14, 2017 at 3:53 PM, Guozhang Wang <wa...@gmail.com> wrote:
>>
>>> Thanks Bill for the updated wiki. I have a couple of more comments:
>>>
>>> 1. Setting StateRestoreListener on the KafkaStreams granularity may not be
>>> sufficient, as in the listener callback we do not which store it is
>>> restoring right now: if the topic is a changelog topic then from the
>>> `TopicPartition` we may be able to infer the state store name, but if the
>>> topic is the source topic read as a KTable then we may not know which
>>> store
>>> it is restoring right now; plus forcing users to infer the state store
>>> name
>>> from the topic partition name would not be intuitive as well. Plus for
>>> different stores the listener may be implemented differently, and setting
>>> a
>>> global listener would force users to branch on the topic-partition names,
>>> similarly to what we did in the global timestamp extractor. On the other
>>> hand, I also agree that setting the listener on the per-store granularity
>>> may be a bit cumbersome since if users want to override it on a specific
>>> store it needs to expose some APIs maybe at StateStoreSupplier. So would
>>> love to hear other people's opinions.
>>>
>>> If we think that different implemented restoring callback may be less
>>> common, then I'd suggest at least replace the `TopicPartition` parameter
>>> with the `String` store name and the `TaskId`?
>>>
>>> 2. I think we can pass in the `long endOffset` in the `onRestoreStart`
>>> function as well, as we will have read the endOffset already by then;
>>> otherwise users can still not be able to track the restoration progress
>>> (e.g. how much percentage I have been restoring so far, to estimate on how
>>> long I still need to wait).
>>>
>>>
>>> Guozhang
>>>
>>>
>>>
>>> On Wed, Jun 14, 2017 at 12:25 PM, Bill Bejeck <bb...@gmail.com> wrote:
>>>
>>>> Eno,
>>>>
>>>> Thanks for the comments.
>>>>
>>>> 1. As for having both restore and restoreAll, I kept the restore method
>>> for
>>>> backward compatibility as that is what is used by current implementing
>>>> classes. However as I think about it makes things cleaner to have a
>>> single
>>>> restore method taking a collection. I'll wait for others to weigh in,
>>> but
>>>> I'm leaning towards having a single restore method.
>>>>
>>>> 2. The "onBatchRestored" method is for keeping track of the restore
>>> process
>>>> as we load records from each poll request.
>>>>
>>>>    For example if the change log contained 5000 records and
>>>> MAX_POLL_RECORDS is set to 1000, the "onBatchRestored" method would get
>>>> called 5 times each time with the ending offset of the last record in
>>> the
>>>> batch and the count    of the batch.   I'll update the KIP to add
>>> comments
>>>> above the interface methods.
>>>>
>>>>
>>>> Thanks,
>>>> Bill
>>>>
>>>>
>>>> On Wed, Jun 14, 2017 at 11:49 AM, Eno Thereska <en...@gmail.com>
>>>> wrote:
>>>>
>>>>> Thanks Bill,
>>>>>
>>>>> A couple of questions:
>>>>>
>>>>>
>>>> 1. why do we need both restore and restoreAll, why can't we just have
>>> one,
>>>>> that takes a collection (i.e., restore all)? Are there cases when
>>> people
>>>>> want to restore one at a time? In that case, they could still use
>>>>> restoreAll with just 1 record in the collection right?
>>>>>
>>>>> 2. I don't quite get "onBatchRestored". Could you put a small comment
>>> on
>>>>> top of all three methods. An example might help here.
>>>>>
>>>>> Thanks
>>>>> Eno
>>>>>
>>>>>
>>>>>> On 8 Jun 2017, at 18:05, Bill Bejeck <bb...@gmail.com> wrote:
>>>>>>
>>>>>> Guozhang, Damian thanks for the comments.
>>>>>>
>>>>>> Giving developers the ability to hook into StateStore recovery
>>> phases
>>>> was
>>>>>> part of my original intent. However the state the KIP is in now
>>> won't
>>>>>> provide this functionality.
>>>>>>
>>>>>> As a result I'll be doing a significant revision of KIP-167.  I'll
>>> be
>>>>> sure
>>>>>> to incorporate all your comments in the new revision.
>>>>>>
>>>>>> Thanks,
>>>>>> Bill
>>>>>>
>>>>>> On Wed, Jun 7, 2017 at 5:24 AM, Damian Guy <da...@gmail.com>
>>>> wrote:
>>>>>>
>>>>>>> I'm largely in agreement with what Guozhang has suggested, i.e.,
>>>>>>> StateRestoreContext shouldn't have any setters on it and also need
>>> to
>>>>> have
>>>>>>> the end offset available such that people can use it derive
>>> progress.
>>>>>>> Slightly different, maybe the StateRestoreContext interface could
>>> be:
>>>>>>>
>>>>>>> long beginOffset()
>>>>>>> long endOffset()
>>>>>>> long currentOffset()
>>>>>>>
>>>>>>> One further thing, this currently doesn't provide developers the
>>>>> ability to
>>>>>>> hook into this information if they are using the built-in
>>> StateStores.
>>>>> Is
>>>>>>> this something we should be considering?
>>>>>>>
>>>>>>>
>>>>>>> On Tue, 6 Jun 2017 at 23:32 Guozhang Wang <wa...@gmail.com>
>>> wrote:
>>>>>>>
>>>>>>>> Thanks for the updated KIP Bill, I have a couple of comments:
>>>>>>>>
>>>>>>>> 1) I'm assuming beginRestore / endRestore is called only once per
>>>> store
>>>>>>>> throughout the whole restoration process, and restoreAll is called
>>>> per
>>>>>>>> batch. In that case I feel we can set the StateRestoreContext as a
>>>>> second
>>>>>>>> parameter in restoreAll and in endRestore as well, and let the
>>>> library
>>>>> to
>>>>>>>> set the corresponding values instead and only let users to read
>>>> (since
>>>>>>> the
>>>>>>>> collection of key-value pairs do not contain offset information
>>>> anyways
>>>>>>>> users cannot really set the offset). The "lastOffsetRestored"
>>> would
>>>> be
>>>>>>> the
>>>>>>>> starting offset when called on `beginRestore`.
>>>>>>>>
>>>>>>>> 2) Users who wants to implement their own batch restoration
>>> callbacks
>>>>>>> would
>>>>>>>> now need to implement both `restore` and `restoreAll` while they
>>>> either
>>>>>>> let
>>>>>>>> `restoreAll` to call `restore` or implement the logic in
>>> `restoreAll`
>>>>>>> only
>>>>>>>> and never call `restore`. Maybe we can provide two abstract impl
>>> of
>>>>>>>> BatchingStateRestoreCallbacks which does beginRestore /
>>> endRestore as
>>>>>>>> no-ops, with one callback implementing `restoreAll` to call
>>> abstract
>>>>>>>> `restore` while the other implement `restore` to throw "not
>>> supported
>>>>>>>> exception" and keep `restoreAll` abstract.
>>>>>>>>
>>>>>>>> 3) I think we can also return the "offset limit" in
>>>>> StateRestoreContext,
>>>>>>>> which is important for users to track the restoration progress
>>> since
>>>>>>>> otherwise they could not tell how many percent of restoration has
>>>>>>>> completed.  I.e.:
>>>>>>>>
>>>>>>>> public interface BatchingStateRestoreCallback extends
>>>>>>> StateRestoreCallback
>>>>>>>> {
>>>>>>>>
>>>>>>>>   void restoreAll(Collection<KeyValue<byte[], byte []>> records,
>>>>>>>> StateRestoreContext
>>>>>>>> restoreContext);
>>>>>>>>
>>>>>>>>   void beginRestore(StateRestoreContext restoreContext);
>>>>>>>>
>>>>>>>>   void endRestore(StateRestoreContext restoreContext);
>>>>>>>> }
>>>>>>>>
>>>>>>>> public interface StateRestoreContext {
>>>>>>>>
>>>>>>>>  long lastOffsetRestored();
>>>>>>>>
>>>>>>>>  long endOffsetToRestore();
>>>>>>>>
>>>>>>>>  int numberRestored();
>>>>>>>> }
>>>>>>>>
>>>>>>>>
>>>>>>>> Guozhang
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Fri, Jun 2, 2017 at 9:16 AM, Bill Bejeck <bb...@gmail.com>
>>>> wrote:
>>>>>>>>
>>>>>>>>> Guozhang, Matthias,
>>>>>>>>>
>>>>>>>>> Thanks for the comments.  I have updated the KIP, (JIRA title and
>>>>>>>>> description as well).
>>>>>>>>>
>>>>>>>>> I had thought about introducing a separate interface altogether,
>>> but
>>>>>>>>> extending the current one makes more sense.
>>>>>>>>>
>>>>>>>>> As for intermediate callbacks based on time or number of
>>> records, I
>>>>>>> think
>>>>>>>>> the latest update to the KIP addresses this point of querying for
>>>>>>>>> intermediate results, but it would be per batch restored.
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>> Bill
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Fri, Jun 2, 2017 at 8:36 AM, Jim Jagielski <ji...@jagunet.com>
>>>>> wrote:
>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>> On Jun 2, 2017, at 12:54 AM, Matthias J. Sax <
>>>>>>> matthias@confluent.io>
>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>> With regard to backward compatibility, we should not change the
>>>>>>>> current
>>>>>>>>>>> interface, but add a new interface that extends the current
>>> one.
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> ++1
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> -- Guozhang
>>>>>>>>
>>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>>
>>>
>>> --
>>> -- Guozhang
>>>
>>
>>
> 


Re: [DISCUSS] KIP-167: Add a restoreAll method to StateRestoreCallback

Posted by Bill Bejeck <bb...@gmail.com>.
Thinking about the custom StateRestoreListener approach and having a get
method on the interface will really only work for custom state stores.

So we'll need to provide another way for users to set behavior with
provided state stores.  The only option that comes to mind now is also
adding a parameter to the StateStoreSupplier.


Bill


On Wed, Jun 14, 2017 at 5:39 PM, Bill Bejeck <bb...@gmail.com> wrote:

> Guozhang,
>
> Thanks for the comments.
>
> 1.  As for the granularity, I agree that having one global
> StateRestoreListener could be restrictive.  But I think it's important to
> have a "setStateRestoreListener" on KafkaStreams as this allows users to
> define an anonymous instance that has access to local scope for reporting
> purposes.  This is a similar pattern we use for
> KafkaStreams.setStateListener.
>
> As an alternative, what if we add a method to the BatchingStateRestoreCallback
> interface named "getStateStoreListener".   Then in an abstract adapter
> class we return null from getStateStoreListener.   But if users want to
> supply a different StateRestoreListener strategy per callback they would
> simply override the method to return an actual instance.
>
> WDYT?
>
> 2.  I'll make the required updates to pass in the ending offset at the
> start as well as the actual name of the state store.
>
> Bill
>
>
> On Wed, Jun 14, 2017 at 3:53 PM, Guozhang Wang <wa...@gmail.com> wrote:
>
>> Thanks Bill for the updated wiki. I have a couple of more comments:
>>
>> 1. Setting StateRestoreListener on the KafkaStreams granularity may not be
>> sufficient, as in the listener callback we do not which store it is
>> restoring right now: if the topic is a changelog topic then from the
>> `TopicPartition` we may be able to infer the state store name, but if the
>> topic is the source topic read as a KTable then we may not know which
>> store
>> it is restoring right now; plus forcing users to infer the state store
>> name
>> from the topic partition name would not be intuitive as well. Plus for
>> different stores the listener may be implemented differently, and setting
>> a
>> global listener would force users to branch on the topic-partition names,
>> similarly to what we did in the global timestamp extractor. On the other
>> hand, I also agree that setting the listener on the per-store granularity
>> may be a bit cumbersome since if users want to override it on a specific
>> store it needs to expose some APIs maybe at StateStoreSupplier. So would
>> love to hear other people's opinions.
>>
>> If we think that different implemented restoring callback may be less
>> common, then I'd suggest at least replace the `TopicPartition` parameter
>> with the `String` store name and the `TaskId`?
>>
>> 2. I think we can pass in the `long endOffset` in the `onRestoreStart`
>> function as well, as we will have read the endOffset already by then;
>> otherwise users can still not be able to track the restoration progress
>> (e.g. how much percentage I have been restoring so far, to estimate on how
>> long I still need to wait).
>>
>>
>> Guozhang
>>
>>
>>
>> On Wed, Jun 14, 2017 at 12:25 PM, Bill Bejeck <bb...@gmail.com> wrote:
>>
>> > Eno,
>> >
>> > Thanks for the comments.
>> >
>> > 1. As for having both restore and restoreAll, I kept the restore method
>> for
>> > backward compatibility as that is what is used by current implementing
>> > classes. However as I think about it makes things cleaner to have a
>> single
>> > restore method taking a collection. I'll wait for others to weigh in,
>> but
>> > I'm leaning towards having a single restore method.
>> >
>> > 2. The "onBatchRestored" method is for keeping track of the restore
>> process
>> > as we load records from each poll request.
>> >
>> >    For example if the change log contained 5000 records and
>> > MAX_POLL_RECORDS is set to 1000, the "onBatchRestored" method would get
>> > called 5 times each time with the ending offset of the last record in
>> the
>> > batch and the count    of the batch.   I'll update the KIP to add
>> comments
>> > above the interface methods.
>> >
>> >
>> > Thanks,
>> > Bill
>> >
>> >
>> > On Wed, Jun 14, 2017 at 11:49 AM, Eno Thereska <en...@gmail.com>
>> > wrote:
>> >
>> > > Thanks Bill,
>> > >
>> > > A couple of questions:
>> > >
>> > >
>> > 1. why do we need both restore and restoreAll, why can't we just have
>> one,
>> > > that takes a collection (i.e., restore all)? Are there cases when
>> people
>> > > want to restore one at a time? In that case, they could still use
>> > > restoreAll with just 1 record in the collection right?
>> > >
>> > > 2. I don't quite get "onBatchRestored". Could you put a small comment
>> on
>> > > top of all three methods. An example might help here.
>> > >
>> > > Thanks
>> > > Eno
>> > >
>> > >
>> > > > On 8 Jun 2017, at 18:05, Bill Bejeck <bb...@gmail.com> wrote:
>> > > >
>> > > > Guozhang, Damian thanks for the comments.
>> > > >
>> > > > Giving developers the ability to hook into StateStore recovery
>> phases
>> > was
>> > > > part of my original intent. However the state the KIP is in now
>> won't
>> > > > provide this functionality.
>> > > >
>> > > > As a result I'll be doing a significant revision of KIP-167.  I'll
>> be
>> > > sure
>> > > > to incorporate all your comments in the new revision.
>> > > >
>> > > > Thanks,
>> > > > Bill
>> > > >
>> > > > On Wed, Jun 7, 2017 at 5:24 AM, Damian Guy <da...@gmail.com>
>> > wrote:
>> > > >
>> > > >> I'm largely in agreement with what Guozhang has suggested, i.e.,
>> > > >> StateRestoreContext shouldn't have any setters on it and also need
>> to
>> > > have
>> > > >> the end offset available such that people can use it derive
>> progress.
>> > > >> Slightly different, maybe the StateRestoreContext interface could
>> be:
>> > > >>
>> > > >> long beginOffset()
>> > > >> long endOffset()
>> > > >> long currentOffset()
>> > > >>
>> > > >> One further thing, this currently doesn't provide developers the
>> > > ability to
>> > > >> hook into this information if they are using the built-in
>> StateStores.
>> > > Is
>> > > >> this something we should be considering?
>> > > >>
>> > > >>
>> > > >> On Tue, 6 Jun 2017 at 23:32 Guozhang Wang <wa...@gmail.com>
>> wrote:
>> > > >>
>> > > >>> Thanks for the updated KIP Bill, I have a couple of comments:
>> > > >>>
>> > > >>> 1) I'm assuming beginRestore / endRestore is called only once per
>> > store
>> > > >>> throughout the whole restoration process, and restoreAll is called
>> > per
>> > > >>> batch. In that case I feel we can set the StateRestoreContext as a
>> > > second
>> > > >>> parameter in restoreAll and in endRestore as well, and let the
>> > library
>> > > to
>> > > >>> set the corresponding values instead and only let users to read
>> > (since
>> > > >> the
>> > > >>> collection of key-value pairs do not contain offset information
>> > anyways
>> > > >>> users cannot really set the offset). The "lastOffsetRestored"
>> would
>> > be
>> > > >> the
>> > > >>> starting offset when called on `beginRestore`.
>> > > >>>
>> > > >>> 2) Users who wants to implement their own batch restoration
>> callbacks
>> > > >> would
>> > > >>> now need to implement both `restore` and `restoreAll` while they
>> > either
>> > > >> let
>> > > >>> `restoreAll` to call `restore` or implement the logic in
>> `restoreAll`
>> > > >> only
>> > > >>> and never call `restore`. Maybe we can provide two abstract impl
>> of
>> > > >>> BatchingStateRestoreCallbacks which does beginRestore /
>> endRestore as
>> > > >>> no-ops, with one callback implementing `restoreAll` to call
>> abstract
>> > > >>> `restore` while the other implement `restore` to throw "not
>> supported
>> > > >>> exception" and keep `restoreAll` abstract.
>> > > >>>
>> > > >>> 3) I think we can also return the "offset limit" in
>> > > StateRestoreContext,
>> > > >>> which is important for users to track the restoration progress
>> since
>> > > >>> otherwise they could not tell how many percent of restoration has
>> > > >>> completed.  I.e.:
>> > > >>>
>> > > >>> public interface BatchingStateRestoreCallback extends
>> > > >> StateRestoreCallback
>> > > >>> {
>> > > >>>
>> > > >>>   void restoreAll(Collection<KeyValue<byte[], byte []>> records,
>> > > >>> StateRestoreContext
>> > > >>> restoreContext);
>> > > >>>
>> > > >>>   void beginRestore(StateRestoreContext restoreContext);
>> > > >>>
>> > > >>>   void endRestore(StateRestoreContext restoreContext);
>> > > >>> }
>> > > >>>
>> > > >>> public interface StateRestoreContext {
>> > > >>>
>> > > >>>  long lastOffsetRestored();
>> > > >>>
>> > > >>>  long endOffsetToRestore();
>> > > >>>
>> > > >>>  int numberRestored();
>> > > >>> }
>> > > >>>
>> > > >>>
>> > > >>> Guozhang
>> > > >>>
>> > > >>>
>> > > >>>
>> > > >>> On Fri, Jun 2, 2017 at 9:16 AM, Bill Bejeck <bb...@gmail.com>
>> > wrote:
>> > > >>>
>> > > >>>> Guozhang, Matthias,
>> > > >>>>
>> > > >>>> Thanks for the comments.  I have updated the KIP, (JIRA title and
>> > > >>>> description as well).
>> > > >>>>
>> > > >>>> I had thought about introducing a separate interface altogether,
>> but
>> > > >>>> extending the current one makes more sense.
>> > > >>>>
>> > > >>>> As for intermediate callbacks based on time or number of
>> records, I
>> > > >> think
>> > > >>>> the latest update to the KIP addresses this point of querying for
>> > > >>>> intermediate results, but it would be per batch restored.
>> > > >>>>
>> > > >>>> Thanks,
>> > > >>>> Bill
>> > > >>>>
>> > > >>>>
>> > > >>>>
>> > > >>>>
>> > > >>>>
>> > > >>>> On Fri, Jun 2, 2017 at 8:36 AM, Jim Jagielski <ji...@jagunet.com>
>> > > wrote:
>> > > >>>>
>> > > >>>>>
>> > > >>>>>> On Jun 2, 2017, at 12:54 AM, Matthias J. Sax <
>> > > >> matthias@confluent.io>
>> > > >>>>> wrote:
>> > > >>>>>>
>> > > >>>>>> With regard to backward compatibility, we should not change the
>> > > >>> current
>> > > >>>>>> interface, but add a new interface that extends the current
>> one.
>> > > >>>>>>
>> > > >>>>>
>> > > >>>>> ++1
>> > > >>>>>
>> > > >>>>>
>> > > >>>>
>> > > >>>
>> > > >>>
>> > > >>>
>> > > >>> --
>> > > >>> -- Guozhang
>> > > >>>
>> > > >>
>> > >
>> > >
>> >
>>
>>
>>
>> --
>> -- Guozhang
>>
>
>

Re: [DISCUSS] KIP-167: Add a restoreAll method to StateRestoreCallback

Posted by Bill Bejeck <bb...@gmail.com>.
Guozhang,

Thanks for the comments.

1.  As for the granularity, I agree that having one global
StateRestoreListener could be restrictive.  But I think it's important to
have a "setStateRestoreListener" on KafkaStreams as this allows users to
define an anonymous instance that has access to local scope for reporting
purposes.  This is a similar pattern we use for
KafkaStreams.setStateListener.

As an alternative, what if we add a method to
the BatchingStateRestoreCallback interface named
"getStateStoreListener".   Then
in an abstract adapter class we return null from getStateStoreListener.
But if users want to supply a different StateRestoreListener strategy per
callback they would simply override the method to return an actual
instance.

WDYT?

2.  I'll make the required updates to pass in the ending offset at the
start as well as the actual name of the state store.

Bill


On Wed, Jun 14, 2017 at 3:53 PM, Guozhang Wang <wa...@gmail.com> wrote:

> Thanks Bill for the updated wiki. I have a couple of more comments:
>
> 1. Setting StateRestoreListener on the KafkaStreams granularity may not be
> sufficient, as in the listener callback we do not which store it is
> restoring right now: if the topic is a changelog topic then from the
> `TopicPartition` we may be able to infer the state store name, but if the
> topic is the source topic read as a KTable then we may not know which store
> it is restoring right now; plus forcing users to infer the state store name
> from the topic partition name would not be intuitive as well. Plus for
> different stores the listener may be implemented differently, and setting a
> global listener would force users to branch on the topic-partition names,
> similarly to what we did in the global timestamp extractor. On the other
> hand, I also agree that setting the listener on the per-store granularity
> may be a bit cumbersome since if users want to override it on a specific
> store it needs to expose some APIs maybe at StateStoreSupplier. So would
> love to hear other people's opinions.
>
> If we think that different implemented restoring callback may be less
> common, then I'd suggest at least replace the `TopicPartition` parameter
> with the `String` store name and the `TaskId`?
>
> 2. I think we can pass in the `long endOffset` in the `onRestoreStart`
> function as well, as we will have read the endOffset already by then;
> otherwise users can still not be able to track the restoration progress
> (e.g. how much percentage I have been restoring so far, to estimate on how
> long I still need to wait).
>
>
> Guozhang
>
>
>
> On Wed, Jun 14, 2017 at 12:25 PM, Bill Bejeck <bb...@gmail.com> wrote:
>
> > Eno,
> >
> > Thanks for the comments.
> >
> > 1. As for having both restore and restoreAll, I kept the restore method
> for
> > backward compatibility as that is what is used by current implementing
> > classes. However as I think about it makes things cleaner to have a
> single
> > restore method taking a collection. I'll wait for others to weigh in, but
> > I'm leaning towards having a single restore method.
> >
> > 2. The "onBatchRestored" method is for keeping track of the restore
> process
> > as we load records from each poll request.
> >
> >    For example if the change log contained 5000 records and
> > MAX_POLL_RECORDS is set to 1000, the "onBatchRestored" method would get
> > called 5 times each time with the ending offset of the last record in the
> > batch and the count    of the batch.   I'll update the KIP to add
> comments
> > above the interface methods.
> >
> >
> > Thanks,
> > Bill
> >
> >
> > On Wed, Jun 14, 2017 at 11:49 AM, Eno Thereska <en...@gmail.com>
> > wrote:
> >
> > > Thanks Bill,
> > >
> > > A couple of questions:
> > >
> > >
> > 1. why do we need both restore and restoreAll, why can't we just have
> one,
> > > that takes a collection (i.e., restore all)? Are there cases when
> people
> > > want to restore one at a time? In that case, they could still use
> > > restoreAll with just 1 record in the collection right?
> > >
> > > 2. I don't quite get "onBatchRestored". Could you put a small comment
> on
> > > top of all three methods. An example might help here.
> > >
> > > Thanks
> > > Eno
> > >
> > >
> > > > On 8 Jun 2017, at 18:05, Bill Bejeck <bb...@gmail.com> wrote:
> > > >
> > > > Guozhang, Damian thanks for the comments.
> > > >
> > > > Giving developers the ability to hook into StateStore recovery phases
> > was
> > > > part of my original intent. However the state the KIP is in now won't
> > > > provide this functionality.
> > > >
> > > > As a result I'll be doing a significant revision of KIP-167.  I'll be
> > > sure
> > > > to incorporate all your comments in the new revision.
> > > >
> > > > Thanks,
> > > > Bill
> > > >
> > > > On Wed, Jun 7, 2017 at 5:24 AM, Damian Guy <da...@gmail.com>
> > wrote:
> > > >
> > > >> I'm largely in agreement with what Guozhang has suggested, i.e.,
> > > >> StateRestoreContext shouldn't have any setters on it and also need
> to
> > > have
> > > >> the end offset available such that people can use it derive
> progress.
> > > >> Slightly different, maybe the StateRestoreContext interface could
> be:
> > > >>
> > > >> long beginOffset()
> > > >> long endOffset()
> > > >> long currentOffset()
> > > >>
> > > >> One further thing, this currently doesn't provide developers the
> > > ability to
> > > >> hook into this information if they are using the built-in
> StateStores.
> > > Is
> > > >> this something we should be considering?
> > > >>
> > > >>
> > > >> On Tue, 6 Jun 2017 at 23:32 Guozhang Wang <wa...@gmail.com>
> wrote:
> > > >>
> > > >>> Thanks for the updated KIP Bill, I have a couple of comments:
> > > >>>
> > > >>> 1) I'm assuming beginRestore / endRestore is called only once per
> > store
> > > >>> throughout the whole restoration process, and restoreAll is called
> > per
> > > >>> batch. In that case I feel we can set the StateRestoreContext as a
> > > second
> > > >>> parameter in restoreAll and in endRestore as well, and let the
> > library
> > > to
> > > >>> set the corresponding values instead and only let users to read
> > (since
> > > >> the
> > > >>> collection of key-value pairs do not contain offset information
> > anyways
> > > >>> users cannot really set the offset). The "lastOffsetRestored" would
> > be
> > > >> the
> > > >>> starting offset when called on `beginRestore`.
> > > >>>
> > > >>> 2) Users who wants to implement their own batch restoration
> callbacks
> > > >> would
> > > >>> now need to implement both `restore` and `restoreAll` while they
> > either
> > > >> let
> > > >>> `restoreAll` to call `restore` or implement the logic in
> `restoreAll`
> > > >> only
> > > >>> and never call `restore`. Maybe we can provide two abstract impl of
> > > >>> BatchingStateRestoreCallbacks which does beginRestore / endRestore
> as
> > > >>> no-ops, with one callback implementing `restoreAll` to call
> abstract
> > > >>> `restore` while the other implement `restore` to throw "not
> supported
> > > >>> exception" and keep `restoreAll` abstract.
> > > >>>
> > > >>> 3) I think we can also return the "offset limit" in
> > > StateRestoreContext,
> > > >>> which is important for users to track the restoration progress
> since
> > > >>> otherwise they could not tell how many percent of restoration has
> > > >>> completed.  I.e.:
> > > >>>
> > > >>> public interface BatchingStateRestoreCallback extends
> > > >> StateRestoreCallback
> > > >>> {
> > > >>>
> > > >>>   void restoreAll(Collection<KeyValue<byte[], byte []>> records,
> > > >>> StateRestoreContext
> > > >>> restoreContext);
> > > >>>
> > > >>>   void beginRestore(StateRestoreContext restoreContext);
> > > >>>
> > > >>>   void endRestore(StateRestoreContext restoreContext);
> > > >>> }
> > > >>>
> > > >>> public interface StateRestoreContext {
> > > >>>
> > > >>>  long lastOffsetRestored();
> > > >>>
> > > >>>  long endOffsetToRestore();
> > > >>>
> > > >>>  int numberRestored();
> > > >>> }
> > > >>>
> > > >>>
> > > >>> Guozhang
> > > >>>
> > > >>>
> > > >>>
> > > >>> On Fri, Jun 2, 2017 at 9:16 AM, Bill Bejeck <bb...@gmail.com>
> > wrote:
> > > >>>
> > > >>>> Guozhang, Matthias,
> > > >>>>
> > > >>>> Thanks for the comments.  I have updated the KIP, (JIRA title and
> > > >>>> description as well).
> > > >>>>
> > > >>>> I had thought about introducing a separate interface altogether,
> but
> > > >>>> extending the current one makes more sense.
> > > >>>>
> > > >>>> As for intermediate callbacks based on time or number of records,
> I
> > > >> think
> > > >>>> the latest update to the KIP addresses this point of querying for
> > > >>>> intermediate results, but it would be per batch restored.
> > > >>>>
> > > >>>> Thanks,
> > > >>>> Bill
> > > >>>>
> > > >>>>
> > > >>>>
> > > >>>>
> > > >>>>
> > > >>>> On Fri, Jun 2, 2017 at 8:36 AM, Jim Jagielski <ji...@jagunet.com>
> > > wrote:
> > > >>>>
> > > >>>>>
> > > >>>>>> On Jun 2, 2017, at 12:54 AM, Matthias J. Sax <
> > > >> matthias@confluent.io>
> > > >>>>> wrote:
> > > >>>>>>
> > > >>>>>> With regard to backward compatibility, we should not change the
> > > >>> current
> > > >>>>>> interface, but add a new interface that extends the current one.
> > > >>>>>>
> > > >>>>>
> > > >>>>> ++1
> > > >>>>>
> > > >>>>>
> > > >>>>
> > > >>>
> > > >>>
> > > >>>
> > > >>> --
> > > >>> -- Guozhang
> > > >>>
> > > >>
> > >
> > >
> >
>
>
>
> --
> -- Guozhang
>

Re: [DISCUSS] KIP-167: Add a restoreAll method to StateRestoreCallback

Posted by Guozhang Wang <wa...@gmail.com>.
Thanks Bill for the updated wiki. I have a couple of more comments:

1. Setting StateRestoreListener on the KafkaStreams granularity may not be
sufficient, as in the listener callback we do not which store it is
restoring right now: if the topic is a changelog topic then from the
`TopicPartition` we may be able to infer the state store name, but if the
topic is the source topic read as a KTable then we may not know which store
it is restoring right now; plus forcing users to infer the state store name
from the topic partition name would not be intuitive as well. Plus for
different stores the listener may be implemented differently, and setting a
global listener would force users to branch on the topic-partition names,
similarly to what we did in the global timestamp extractor. On the other
hand, I also agree that setting the listener on the per-store granularity
may be a bit cumbersome since if users want to override it on a specific
store it needs to expose some APIs maybe at StateStoreSupplier. So would
love to hear other people's opinions.

If we think that different implemented restoring callback may be less
common, then I'd suggest at least replace the `TopicPartition` parameter
with the `String` store name and the `TaskId`?

2. I think we can pass in the `long endOffset` in the `onRestoreStart`
function as well, as we will have read the endOffset already by then;
otherwise users can still not be able to track the restoration progress
(e.g. how much percentage I have been restoring so far, to estimate on how
long I still need to wait).


Guozhang



On Wed, Jun 14, 2017 at 12:25 PM, Bill Bejeck <bb...@gmail.com> wrote:

> Eno,
>
> Thanks for the comments.
>
> 1. As for having both restore and restoreAll, I kept the restore method for
> backward compatibility as that is what is used by current implementing
> classes. However as I think about it makes things cleaner to have a single
> restore method taking a collection. I'll wait for others to weigh in, but
> I'm leaning towards having a single restore method.
>
> 2. The "onBatchRestored" method is for keeping track of the restore process
> as we load records from each poll request.
>
>    For example if the change log contained 5000 records and
> MAX_POLL_RECORDS is set to 1000, the "onBatchRestored" method would get
> called 5 times each time with the ending offset of the last record in the
> batch and the count    of the batch.   I'll update the KIP to add comments
> above the interface methods.
>
>
> Thanks,
> Bill
>
>
> On Wed, Jun 14, 2017 at 11:49 AM, Eno Thereska <en...@gmail.com>
> wrote:
>
> > Thanks Bill,
> >
> > A couple of questions:
> >
> >
> 1. why do we need both restore and restoreAll, why can't we just have one,
> > that takes a collection (i.e., restore all)? Are there cases when people
> > want to restore one at a time? In that case, they could still use
> > restoreAll with just 1 record in the collection right?
> >
> > 2. I don't quite get "onBatchRestored". Could you put a small comment on
> > top of all three methods. An example might help here.
> >
> > Thanks
> > Eno
> >
> >
> > > On 8 Jun 2017, at 18:05, Bill Bejeck <bb...@gmail.com> wrote:
> > >
> > > Guozhang, Damian thanks for the comments.
> > >
> > > Giving developers the ability to hook into StateStore recovery phases
> was
> > > part of my original intent. However the state the KIP is in now won't
> > > provide this functionality.
> > >
> > > As a result I'll be doing a significant revision of KIP-167.  I'll be
> > sure
> > > to incorporate all your comments in the new revision.
> > >
> > > Thanks,
> > > Bill
> > >
> > > On Wed, Jun 7, 2017 at 5:24 AM, Damian Guy <da...@gmail.com>
> wrote:
> > >
> > >> I'm largely in agreement with what Guozhang has suggested, i.e.,
> > >> StateRestoreContext shouldn't have any setters on it and also need to
> > have
> > >> the end offset available such that people can use it derive progress.
> > >> Slightly different, maybe the StateRestoreContext interface could be:
> > >>
> > >> long beginOffset()
> > >> long endOffset()
> > >> long currentOffset()
> > >>
> > >> One further thing, this currently doesn't provide developers the
> > ability to
> > >> hook into this information if they are using the built-in StateStores.
> > Is
> > >> this something we should be considering?
> > >>
> > >>
> > >> On Tue, 6 Jun 2017 at 23:32 Guozhang Wang <wa...@gmail.com> wrote:
> > >>
> > >>> Thanks for the updated KIP Bill, I have a couple of comments:
> > >>>
> > >>> 1) I'm assuming beginRestore / endRestore is called only once per
> store
> > >>> throughout the whole restoration process, and restoreAll is called
> per
> > >>> batch. In that case I feel we can set the StateRestoreContext as a
> > second
> > >>> parameter in restoreAll and in endRestore as well, and let the
> library
> > to
> > >>> set the corresponding values instead and only let users to read
> (since
> > >> the
> > >>> collection of key-value pairs do not contain offset information
> anyways
> > >>> users cannot really set the offset). The "lastOffsetRestored" would
> be
> > >> the
> > >>> starting offset when called on `beginRestore`.
> > >>>
> > >>> 2) Users who wants to implement their own batch restoration callbacks
> > >> would
> > >>> now need to implement both `restore` and `restoreAll` while they
> either
> > >> let
> > >>> `restoreAll` to call `restore` or implement the logic in `restoreAll`
> > >> only
> > >>> and never call `restore`. Maybe we can provide two abstract impl of
> > >>> BatchingStateRestoreCallbacks which does beginRestore / endRestore as
> > >>> no-ops, with one callback implementing `restoreAll` to call abstract
> > >>> `restore` while the other implement `restore` to throw "not supported
> > >>> exception" and keep `restoreAll` abstract.
> > >>>
> > >>> 3) I think we can also return the "offset limit" in
> > StateRestoreContext,
> > >>> which is important for users to track the restoration progress since
> > >>> otherwise they could not tell how many percent of restoration has
> > >>> completed.  I.e.:
> > >>>
> > >>> public interface BatchingStateRestoreCallback extends
> > >> StateRestoreCallback
> > >>> {
> > >>>
> > >>>   void restoreAll(Collection<KeyValue<byte[], byte []>> records,
> > >>> StateRestoreContext
> > >>> restoreContext);
> > >>>
> > >>>   void beginRestore(StateRestoreContext restoreContext);
> > >>>
> > >>>   void endRestore(StateRestoreContext restoreContext);
> > >>> }
> > >>>
> > >>> public interface StateRestoreContext {
> > >>>
> > >>>  long lastOffsetRestored();
> > >>>
> > >>>  long endOffsetToRestore();
> > >>>
> > >>>  int numberRestored();
> > >>> }
> > >>>
> > >>>
> > >>> Guozhang
> > >>>
> > >>>
> > >>>
> > >>> On Fri, Jun 2, 2017 at 9:16 AM, Bill Bejeck <bb...@gmail.com>
> wrote:
> > >>>
> > >>>> Guozhang, Matthias,
> > >>>>
> > >>>> Thanks for the comments.  I have updated the KIP, (JIRA title and
> > >>>> description as well).
> > >>>>
> > >>>> I had thought about introducing a separate interface altogether, but
> > >>>> extending the current one makes more sense.
> > >>>>
> > >>>> As for intermediate callbacks based on time or number of records, I
> > >> think
> > >>>> the latest update to the KIP addresses this point of querying for
> > >>>> intermediate results, but it would be per batch restored.
> > >>>>
> > >>>> Thanks,
> > >>>> Bill
> > >>>>
> > >>>>
> > >>>>
> > >>>>
> > >>>>
> > >>>> On Fri, Jun 2, 2017 at 8:36 AM, Jim Jagielski <ji...@jagunet.com>
> > wrote:
> > >>>>
> > >>>>>
> > >>>>>> On Jun 2, 2017, at 12:54 AM, Matthias J. Sax <
> > >> matthias@confluent.io>
> > >>>>> wrote:
> > >>>>>>
> > >>>>>> With regard to backward compatibility, we should not change the
> > >>> current
> > >>>>>> interface, but add a new interface that extends the current one.
> > >>>>>>
> > >>>>>
> > >>>>> ++1
> > >>>>>
> > >>>>>
> > >>>>
> > >>>
> > >>>
> > >>>
> > >>> --
> > >>> -- Guozhang
> > >>>
> > >>
> >
> >
>



-- 
-- Guozhang

Re: [DISCUSS] KIP-167: Add a restoreAll method to StateRestoreCallback

Posted by Bill Bejeck <bb...@gmail.com>.
Eno,

Thanks for the comments.

1. As for having both restore and restoreAll, I kept the restore method for
backward compatibility as that is what is used by current implementing
classes. However as I think about it makes things cleaner to have a single
restore method taking a collection. I'll wait for others to weigh in, but
I'm leaning towards having a single restore method.

2. The "onBatchRestored" method is for keeping track of the restore process
as we load records from each poll request.

   For example if the change log contained 5000 records and
MAX_POLL_RECORDS is set to 1000, the "onBatchRestored" method would get
called 5 times each time with the ending offset of the last record in the
batch and the count    of the batch.   I'll update the KIP to add comments
above the interface methods.


Thanks,
Bill


On Wed, Jun 14, 2017 at 11:49 AM, Eno Thereska <en...@gmail.com>
wrote:

> Thanks Bill,
>
> A couple of questions:
>
>
1. why do we need both restore and restoreAll, why can't we just have one,
> that takes a collection (i.e., restore all)? Are there cases when people
> want to restore one at a time? In that case, they could still use
> restoreAll with just 1 record in the collection right?
>
> 2. I don't quite get "onBatchRestored". Could you put a small comment on
> top of all three methods. An example might help here.
>
> Thanks
> Eno
>
>
> > On 8 Jun 2017, at 18:05, Bill Bejeck <bb...@gmail.com> wrote:
> >
> > Guozhang, Damian thanks for the comments.
> >
> > Giving developers the ability to hook into StateStore recovery phases was
> > part of my original intent. However the state the KIP is in now won't
> > provide this functionality.
> >
> > As a result I'll be doing a significant revision of KIP-167.  I'll be
> sure
> > to incorporate all your comments in the new revision.
> >
> > Thanks,
> > Bill
> >
> > On Wed, Jun 7, 2017 at 5:24 AM, Damian Guy <da...@gmail.com> wrote:
> >
> >> I'm largely in agreement with what Guozhang has suggested, i.e.,
> >> StateRestoreContext shouldn't have any setters on it and also need to
> have
> >> the end offset available such that people can use it derive progress.
> >> Slightly different, maybe the StateRestoreContext interface could be:
> >>
> >> long beginOffset()
> >> long endOffset()
> >> long currentOffset()
> >>
> >> One further thing, this currently doesn't provide developers the
> ability to
> >> hook into this information if they are using the built-in StateStores.
> Is
> >> this something we should be considering?
> >>
> >>
> >> On Tue, 6 Jun 2017 at 23:32 Guozhang Wang <wa...@gmail.com> wrote:
> >>
> >>> Thanks for the updated KIP Bill, I have a couple of comments:
> >>>
> >>> 1) I'm assuming beginRestore / endRestore is called only once per store
> >>> throughout the whole restoration process, and restoreAll is called per
> >>> batch. In that case I feel we can set the StateRestoreContext as a
> second
> >>> parameter in restoreAll and in endRestore as well, and let the library
> to
> >>> set the corresponding values instead and only let users to read (since
> >> the
> >>> collection of key-value pairs do not contain offset information anyways
> >>> users cannot really set the offset). The "lastOffsetRestored" would be
> >> the
> >>> starting offset when called on `beginRestore`.
> >>>
> >>> 2) Users who wants to implement their own batch restoration callbacks
> >> would
> >>> now need to implement both `restore` and `restoreAll` while they either
> >> let
> >>> `restoreAll` to call `restore` or implement the logic in `restoreAll`
> >> only
> >>> and never call `restore`. Maybe we can provide two abstract impl of
> >>> BatchingStateRestoreCallbacks which does beginRestore / endRestore as
> >>> no-ops, with one callback implementing `restoreAll` to call abstract
> >>> `restore` while the other implement `restore` to throw "not supported
> >>> exception" and keep `restoreAll` abstract.
> >>>
> >>> 3) I think we can also return the "offset limit" in
> StateRestoreContext,
> >>> which is important for users to track the restoration progress since
> >>> otherwise they could not tell how many percent of restoration has
> >>> completed.  I.e.:
> >>>
> >>> public interface BatchingStateRestoreCallback extends
> >> StateRestoreCallback
> >>> {
> >>>
> >>>   void restoreAll(Collection<KeyValue<byte[], byte []>> records,
> >>> StateRestoreContext
> >>> restoreContext);
> >>>
> >>>   void beginRestore(StateRestoreContext restoreContext);
> >>>
> >>>   void endRestore(StateRestoreContext restoreContext);
> >>> }
> >>>
> >>> public interface StateRestoreContext {
> >>>
> >>>  long lastOffsetRestored();
> >>>
> >>>  long endOffsetToRestore();
> >>>
> >>>  int numberRestored();
> >>> }
> >>>
> >>>
> >>> Guozhang
> >>>
> >>>
> >>>
> >>> On Fri, Jun 2, 2017 at 9:16 AM, Bill Bejeck <bb...@gmail.com> wrote:
> >>>
> >>>> Guozhang, Matthias,
> >>>>
> >>>> Thanks for the comments.  I have updated the KIP, (JIRA title and
> >>>> description as well).
> >>>>
> >>>> I had thought about introducing a separate interface altogether, but
> >>>> extending the current one makes more sense.
> >>>>
> >>>> As for intermediate callbacks based on time or number of records, I
> >> think
> >>>> the latest update to the KIP addresses this point of querying for
> >>>> intermediate results, but it would be per batch restored.
> >>>>
> >>>> Thanks,
> >>>> Bill
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>> On Fri, Jun 2, 2017 at 8:36 AM, Jim Jagielski <ji...@jagunet.com>
> wrote:
> >>>>
> >>>>>
> >>>>>> On Jun 2, 2017, at 12:54 AM, Matthias J. Sax <
> >> matthias@confluent.io>
> >>>>> wrote:
> >>>>>>
> >>>>>> With regard to backward compatibility, we should not change the
> >>> current
> >>>>>> interface, but add a new interface that extends the current one.
> >>>>>>
> >>>>>
> >>>>> ++1
> >>>>>
> >>>>>
> >>>>
> >>>
> >>>
> >>>
> >>> --
> >>> -- Guozhang
> >>>
> >>
>
>

Re: [DISCUSS] KIP-167: Add a restoreAll method to StateRestoreCallback

Posted by Eno Thereska <en...@gmail.com>.
Thanks Bill,

A couple of questions:


1. why do we need both restore and restoreAll, why can't we just have one, that takes a collection (i.e., restore all)? Are there cases when people want to restore one at a time? In that case, they could still use restoreAll with just 1 record in the collection right?

2. I don't quite get "onBatchRestored". Could you put a small comment on top of all three methods. An example might help here.

Thanks
Eno


> On 8 Jun 2017, at 18:05, Bill Bejeck <bb...@gmail.com> wrote:
> 
> Guozhang, Damian thanks for the comments.
> 
> Giving developers the ability to hook into StateStore recovery phases was
> part of my original intent. However the state the KIP is in now won't
> provide this functionality.
> 
> As a result I'll be doing a significant revision of KIP-167.  I'll be sure
> to incorporate all your comments in the new revision.
> 
> Thanks,
> Bill
> 
> On Wed, Jun 7, 2017 at 5:24 AM, Damian Guy <da...@gmail.com> wrote:
> 
>> I'm largely in agreement with what Guozhang has suggested, i.e.,
>> StateRestoreContext shouldn't have any setters on it and also need to have
>> the end offset available such that people can use it derive progress.
>> Slightly different, maybe the StateRestoreContext interface could be:
>> 
>> long beginOffset()
>> long endOffset()
>> long currentOffset()
>> 
>> One further thing, this currently doesn't provide developers the ability to
>> hook into this information if they are using the built-in StateStores. Is
>> this something we should be considering?
>> 
>> 
>> On Tue, 6 Jun 2017 at 23:32 Guozhang Wang <wa...@gmail.com> wrote:
>> 
>>> Thanks for the updated KIP Bill, I have a couple of comments:
>>> 
>>> 1) I'm assuming beginRestore / endRestore is called only once per store
>>> throughout the whole restoration process, and restoreAll is called per
>>> batch. In that case I feel we can set the StateRestoreContext as a second
>>> parameter in restoreAll and in endRestore as well, and let the library to
>>> set the corresponding values instead and only let users to read (since
>> the
>>> collection of key-value pairs do not contain offset information anyways
>>> users cannot really set the offset). The "lastOffsetRestored" would be
>> the
>>> starting offset when called on `beginRestore`.
>>> 
>>> 2) Users who wants to implement their own batch restoration callbacks
>> would
>>> now need to implement both `restore` and `restoreAll` while they either
>> let
>>> `restoreAll` to call `restore` or implement the logic in `restoreAll`
>> only
>>> and never call `restore`. Maybe we can provide two abstract impl of
>>> BatchingStateRestoreCallbacks which does beginRestore / endRestore as
>>> no-ops, with one callback implementing `restoreAll` to call abstract
>>> `restore` while the other implement `restore` to throw "not supported
>>> exception" and keep `restoreAll` abstract.
>>> 
>>> 3) I think we can also return the "offset limit" in StateRestoreContext,
>>> which is important for users to track the restoration progress since
>>> otherwise they could not tell how many percent of restoration has
>>> completed.  I.e.:
>>> 
>>> public interface BatchingStateRestoreCallback extends
>> StateRestoreCallback
>>> {
>>> 
>>>   void restoreAll(Collection<KeyValue<byte[], byte []>> records,
>>> StateRestoreContext
>>> restoreContext);
>>> 
>>>   void beginRestore(StateRestoreContext restoreContext);
>>> 
>>>   void endRestore(StateRestoreContext restoreContext);
>>> }
>>> 
>>> public interface StateRestoreContext {
>>> 
>>>  long lastOffsetRestored();
>>> 
>>>  long endOffsetToRestore();
>>> 
>>>  int numberRestored();
>>> }
>>> 
>>> 
>>> Guozhang
>>> 
>>> 
>>> 
>>> On Fri, Jun 2, 2017 at 9:16 AM, Bill Bejeck <bb...@gmail.com> wrote:
>>> 
>>>> Guozhang, Matthias,
>>>> 
>>>> Thanks for the comments.  I have updated the KIP, (JIRA title and
>>>> description as well).
>>>> 
>>>> I had thought about introducing a separate interface altogether, but
>>>> extending the current one makes more sense.
>>>> 
>>>> As for intermediate callbacks based on time or number of records, I
>> think
>>>> the latest update to the KIP addresses this point of querying for
>>>> intermediate results, but it would be per batch restored.
>>>> 
>>>> Thanks,
>>>> Bill
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> On Fri, Jun 2, 2017 at 8:36 AM, Jim Jagielski <ji...@jagunet.com> wrote:
>>>> 
>>>>> 
>>>>>> On Jun 2, 2017, at 12:54 AM, Matthias J. Sax <
>> matthias@confluent.io>
>>>>> wrote:
>>>>>> 
>>>>>> With regard to backward compatibility, we should not change the
>>> current
>>>>>> interface, but add a new interface that extends the current one.
>>>>>> 
>>>>> 
>>>>> ++1
>>>>> 
>>>>> 
>>>> 
>>> 
>>> 
>>> 
>>> --
>>> -- Guozhang
>>> 
>> 


Re: [DISCUSS] KIP-167: Add a restoreAll method to StateRestoreCallback

Posted by Bill Bejeck <bb...@gmail.com>.
Guozhang, Damian thanks for the comments.

Giving developers the ability to hook into StateStore recovery phases was
part of my original intent. However the state the KIP is in now won't
provide this functionality.

As a result I'll be doing a significant revision of KIP-167.  I'll be sure
to incorporate all your comments in the new revision.

Thanks,
Bill

On Wed, Jun 7, 2017 at 5:24 AM, Damian Guy <da...@gmail.com> wrote:

> I'm largely in agreement with what Guozhang has suggested, i.e.,
> StateRestoreContext shouldn't have any setters on it and also need to have
> the end offset available such that people can use it derive progress.
> Slightly different, maybe the StateRestoreContext interface could be:
>
> long beginOffset()
> long endOffset()
> long currentOffset()
>
> One further thing, this currently doesn't provide developers the ability to
> hook into this information if they are using the built-in StateStores. Is
> this something we should be considering?
>
>
> On Tue, 6 Jun 2017 at 23:32 Guozhang Wang <wa...@gmail.com> wrote:
>
> > Thanks for the updated KIP Bill, I have a couple of comments:
> >
> > 1) I'm assuming beginRestore / endRestore is called only once per store
> > throughout the whole restoration process, and restoreAll is called per
> > batch. In that case I feel we can set the StateRestoreContext as a second
> > parameter in restoreAll and in endRestore as well, and let the library to
> > set the corresponding values instead and only let users to read (since
> the
> > collection of key-value pairs do not contain offset information anyways
> > users cannot really set the offset). The "lastOffsetRestored" would be
> the
> > starting offset when called on `beginRestore`.
> >
> > 2) Users who wants to implement their own batch restoration callbacks
> would
> > now need to implement both `restore` and `restoreAll` while they either
> let
> > `restoreAll` to call `restore` or implement the logic in `restoreAll`
> only
> > and never call `restore`. Maybe we can provide two abstract impl of
> > BatchingStateRestoreCallbacks which does beginRestore / endRestore as
> > no-ops, with one callback implementing `restoreAll` to call abstract
> > `restore` while the other implement `restore` to throw "not supported
> > exception" and keep `restoreAll` abstract.
> >
> > 3) I think we can also return the "offset limit" in StateRestoreContext,
> > which is important for users to track the restoration progress since
> > otherwise they could not tell how many percent of restoration has
> > completed.  I.e.:
> >
> > public interface BatchingStateRestoreCallback extends
> StateRestoreCallback
> > {
> >
> >    void restoreAll(Collection<KeyValue<byte[], byte []>> records,
> > StateRestoreContext
> > restoreContext);
> >
> >    void beginRestore(StateRestoreContext restoreContext);
> >
> >    void endRestore(StateRestoreContext restoreContext);
> > }
> >
> > public interface StateRestoreContext {
> >
> >   long lastOffsetRestored();
> >
> >   long endOffsetToRestore();
> >
> >   int numberRestored();
> > }
> >
> >
> > Guozhang
> >
> >
> >
> > On Fri, Jun 2, 2017 at 9:16 AM, Bill Bejeck <bb...@gmail.com> wrote:
> >
> > > Guozhang, Matthias,
> > >
> > > Thanks for the comments.  I have updated the KIP, (JIRA title and
> > > description as well).
> > >
> > > I had thought about introducing a separate interface altogether, but
> > > extending the current one makes more sense.
> > >
> > > As for intermediate callbacks based on time or number of records, I
> think
> > > the latest update to the KIP addresses this point of querying for
> > > intermediate results, but it would be per batch restored.
> > >
> > > Thanks,
> > > Bill
> > >
> > >
> > >
> > >
> > >
> > > On Fri, Jun 2, 2017 at 8:36 AM, Jim Jagielski <ji...@jagunet.com> wrote:
> > >
> > > >
> > > > > On Jun 2, 2017, at 12:54 AM, Matthias J. Sax <
> matthias@confluent.io>
> > > > wrote:
> > > > >
> > > > > With regard to backward compatibility, we should not change the
> > current
> > > > > interface, but add a new interface that extends the current one.
> > > > >
> > > >
> > > > ++1
> > > >
> > > >
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>

Re: [DISCUSS] KIP-167: Add a restoreAll method to StateRestoreCallback

Posted by Damian Guy <da...@gmail.com>.
I'm largely in agreement with what Guozhang has suggested, i.e.,
StateRestoreContext shouldn't have any setters on it and also need to have
the end offset available such that people can use it derive progress.
Slightly different, maybe the StateRestoreContext interface could be:

long beginOffset()
long endOffset()
long currentOffset()

One further thing, this currently doesn't provide developers the ability to
hook into this information if they are using the built-in StateStores. Is
this something we should be considering?


On Tue, 6 Jun 2017 at 23:32 Guozhang Wang <wa...@gmail.com> wrote:

> Thanks for the updated KIP Bill, I have a couple of comments:
>
> 1) I'm assuming beginRestore / endRestore is called only once per store
> throughout the whole restoration process, and restoreAll is called per
> batch. In that case I feel we can set the StateRestoreContext as a second
> parameter in restoreAll and in endRestore as well, and let the library to
> set the corresponding values instead and only let users to read (since the
> collection of key-value pairs do not contain offset information anyways
> users cannot really set the offset). The "lastOffsetRestored" would be the
> starting offset when called on `beginRestore`.
>
> 2) Users who wants to implement their own batch restoration callbacks would
> now need to implement both `restore` and `restoreAll` while they either let
> `restoreAll` to call `restore` or implement the logic in `restoreAll` only
> and never call `restore`. Maybe we can provide two abstract impl of
> BatchingStateRestoreCallbacks which does beginRestore / endRestore as
> no-ops, with one callback implementing `restoreAll` to call abstract
> `restore` while the other implement `restore` to throw "not supported
> exception" and keep `restoreAll` abstract.
>
> 3) I think we can also return the "offset limit" in StateRestoreContext,
> which is important for users to track the restoration progress since
> otherwise they could not tell how many percent of restoration has
> completed.  I.e.:
>
> public interface BatchingStateRestoreCallback extends StateRestoreCallback
> {
>
>    void restoreAll(Collection<KeyValue<byte[], byte []>> records,
> StateRestoreContext
> restoreContext);
>
>    void beginRestore(StateRestoreContext restoreContext);
>
>    void endRestore(StateRestoreContext restoreContext);
> }
>
> public interface StateRestoreContext {
>
>   long lastOffsetRestored();
>
>   long endOffsetToRestore();
>
>   int numberRestored();
> }
>
>
> Guozhang
>
>
>
> On Fri, Jun 2, 2017 at 9:16 AM, Bill Bejeck <bb...@gmail.com> wrote:
>
> > Guozhang, Matthias,
> >
> > Thanks for the comments.  I have updated the KIP, (JIRA title and
> > description as well).
> >
> > I had thought about introducing a separate interface altogether, but
> > extending the current one makes more sense.
> >
> > As for intermediate callbacks based on time or number of records, I think
> > the latest update to the KIP addresses this point of querying for
> > intermediate results, but it would be per batch restored.
> >
> > Thanks,
> > Bill
> >
> >
> >
> >
> >
> > On Fri, Jun 2, 2017 at 8:36 AM, Jim Jagielski <ji...@jagunet.com> wrote:
> >
> > >
> > > > On Jun 2, 2017, at 12:54 AM, Matthias J. Sax <ma...@confluent.io>
> > > wrote:
> > > >
> > > > With regard to backward compatibility, we should not change the
> current
> > > > interface, but add a new interface that extends the current one.
> > > >
> > >
> > > ++1
> > >
> > >
> >
>
>
>
> --
> -- Guozhang
>

Re: [DISCUSS] KIP-167: Add a restoreAll method to StateRestoreCallback

Posted by Guozhang Wang <wa...@gmail.com>.
Thanks for the updated KIP Bill, I have a couple of comments:

1) I'm assuming beginRestore / endRestore is called only once per store
throughout the whole restoration process, and restoreAll is called per
batch. In that case I feel we can set the StateRestoreContext as a second
parameter in restoreAll and in endRestore as well, and let the library to
set the corresponding values instead and only let users to read (since the
collection of key-value pairs do not contain offset information anyways
users cannot really set the offset). The "lastOffsetRestored" would be the
starting offset when called on `beginRestore`.

2) Users who wants to implement their own batch restoration callbacks would
now need to implement both `restore` and `restoreAll` while they either let
`restoreAll` to call `restore` or implement the logic in `restoreAll` only
and never call `restore`. Maybe we can provide two abstract impl of
BatchingStateRestoreCallbacks which does beginRestore / endRestore as
no-ops, with one callback implementing `restoreAll` to call abstract
`restore` while the other implement `restore` to throw "not supported
exception" and keep `restoreAll` abstract.

3) I think we can also return the "offset limit" in StateRestoreContext,
which is important for users to track the restoration progress since
otherwise they could not tell how many percent of restoration has
completed.  I.e.:

public interface BatchingStateRestoreCallback extends StateRestoreCallback {

   void restoreAll(Collection<KeyValue<byte[], byte []>> records,
StateRestoreContext
restoreContext);

   void beginRestore(StateRestoreContext restoreContext);

   void endRestore(StateRestoreContext restoreContext);
}

public interface StateRestoreContext {

  long lastOffsetRestored();

  long endOffsetToRestore();

  int numberRestored();
}


Guozhang



On Fri, Jun 2, 2017 at 9:16 AM, Bill Bejeck <bb...@gmail.com> wrote:

> Guozhang, Matthias,
>
> Thanks for the comments.  I have updated the KIP, (JIRA title and
> description as well).
>
> I had thought about introducing a separate interface altogether, but
> extending the current one makes more sense.
>
> As for intermediate callbacks based on time or number of records, I think
> the latest update to the KIP addresses this point of querying for
> intermediate results, but it would be per batch restored.
>
> Thanks,
> Bill
>
>
>
>
>
> On Fri, Jun 2, 2017 at 8:36 AM, Jim Jagielski <ji...@jagunet.com> wrote:
>
> >
> > > On Jun 2, 2017, at 12:54 AM, Matthias J. Sax <ma...@confluent.io>
> > wrote:
> > >
> > > With regard to backward compatibility, we should not change the current
> > > interface, but add a new interface that extends the current one.
> > >
> >
> > ++1
> >
> >
>



-- 
-- Guozhang

Re: [DISCUSS] KIP-167: Add a restoreAll method to StateRestoreCallback

Posted by Bill Bejeck <bb...@gmail.com>.
Guozhang, Matthias,

Thanks for the comments.  I have updated the KIP, (JIRA title and
description as well).

I had thought about introducing a separate interface altogether, but
extending the current one makes more sense.

As for intermediate callbacks based on time or number of records, I think
the latest update to the KIP addresses this point of querying for
intermediate results, but it would be per batch restored.

Thanks,
Bill





On Fri, Jun 2, 2017 at 8:36 AM, Jim Jagielski <ji...@jagunet.com> wrote:

>
> > On Jun 2, 2017, at 12:54 AM, Matthias J. Sax <ma...@confluent.io>
> wrote:
> >
> > With regard to backward compatibility, we should not change the current
> > interface, but add a new interface that extends the current one.
> >
>
> ++1
>
>

Re: [DISCUSS] KIP-167: Add a restoreAll method to StateRestoreCallback

Posted by Jim Jagielski <ji...@jaguNET.com>.
> On Jun 2, 2017, at 12:54 AM, Matthias J. Sax <ma...@confluent.io> wrote:
> 
> With regard to backward compatibility, we should not change the current
> interface, but add a new interface that extends the current one.
> 

++1


Re: [DISCUSS] KIP-167: Add a restoreAll method to StateRestoreCallback

Posted by "Matthias J. Sax" <ma...@confluent.io>.
Thanks for the KIP Bill.

With regard to backward compatibility, we should not change the current
interface, but add a new interface that extends the current one.

If we are going to add "begin" and "after", we might also consider to
add some intermediate call backs. This would allow an application to
monitor the restoring progress. We could consider a call back each x
records or each x seconds. This callback would not contain a record, but
only offset meta data -- like start offset, current offset, end offset.

WDYT?


-Matthias


On 6/1/17 6:17 PM, Bill Bejeck wrote:
> Sure thing. I'll update the KIP.
> 
> Thanks,
> Bill
> 
> On Thu, Jun 1, 2017 at 6:20 PM, Guozhang Wang <wa...@gmail.com> wrote:
> 
>> There are also some request to add "begin" and "after" callbacks in the
>> restoration func:
>>
>> https://issues.apache.org/jira/browse/KAFKA-4322
>>
>> Could we piggy back them into the same KIP?
>>
>>
>> Guozhang
>>
>> On Thu, Jun 1, 2017 at 2:04 PM, Bill Bejeck <bb...@gmail.com> wrote:
>>
>>> All,
>>>
>>> I'd like to start the discussion for adding bulk add functionality when
>>> restoring a state store.  The KIP can be found here:
>>>
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>> 167%3A+Add+a+restoreAll+method+to+StateRestoreCallback
>>>
>>> Thanks,
>>> Bill
>>>
>>
>>
>>
>> --
>> -- Guozhang
>>
> 


Re: [DISCUSS] KIP-167: Add a restoreAll method to StateRestoreCallback

Posted by Bill Bejeck <bb...@gmail.com>.
Sure thing. I'll update the KIP.

Thanks,
Bill

On Thu, Jun 1, 2017 at 6:20 PM, Guozhang Wang <wa...@gmail.com> wrote:

> There are also some request to add "begin" and "after" callbacks in the
> restoration func:
>
> https://issues.apache.org/jira/browse/KAFKA-4322
>
> Could we piggy back them into the same KIP?
>
>
> Guozhang
>
> On Thu, Jun 1, 2017 at 2:04 PM, Bill Bejeck <bb...@gmail.com> wrote:
>
> > All,
> >
> > I'd like to start the discussion for adding bulk add functionality when
> > restoring a state store.  The KIP can be found here:
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > 167%3A+Add+a+restoreAll+method+to+StateRestoreCallback
> >
> > Thanks,
> > Bill
> >
>
>
>
> --
> -- Guozhang
>

Re: [DISCUSS] KIP-167: Add a restoreAll method to StateRestoreCallback

Posted by Guozhang Wang <wa...@gmail.com>.
There are also some request to add "begin" and "after" callbacks in the
restoration func:

https://issues.apache.org/jira/browse/KAFKA-4322

Could we piggy back them into the same KIP?


Guozhang

On Thu, Jun 1, 2017 at 2:04 PM, Bill Bejeck <bb...@gmail.com> wrote:

> All,
>
> I'd like to start the discussion for adding bulk add functionality when
> restoring a state store.  The KIP can be found here:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 167%3A+Add+a+restoreAll+method+to+StateRestoreCallback
>
> Thanks,
> Bill
>



-- 
-- Guozhang