You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by John Roesler <jo...@vvcephei.org> on 2021/11/09 23:37:56 UTC

[DISCUSS] KIP-796: Interactive Query v2

Hello all,

I'd like to start the discussion for KIP-796, which proposes
a revamp of the Interactive Query APIs in Kafka Streams.

The proposal is here:
https://cwiki.apache.org/confluence/x/34xnCw

I look forward to your feedback!

Thank you,
-John


Re: [DISCUSS] KIP-796: Interactive Query v2

Posted by Guozhang Wang <wa...@gmail.com>.
Thanks for the clarification, it looks good to me now.

On Wed, Nov 17, 2021 at 9:21 PM John Roesler <vv...@apache.org> wrote:

> Ah, sorry, Guozhang,
>
> It seem I was a bit too eager with starting the vote thread.
>
> 13: I think that makes perfect sense. I've updated the KIP.
>
> 14: Oof, I can't believe I overlooked those newer
> exceptions. Some of them will become exceptions in IQv2,
> whereas others will beceome individual partition QueryResult
> failures. Here is an accounting of what will become of those
> proposed exceptions:
>
> * StreamsNotStartedException: thrown when stream thread
> state is CREATED, the user can retry until to RUNNING.
>
> * StreamsRebalancingException: thrown when stream thread is
> not running and stream state is REBALANCING. This exception
> is no longer applicable. Regardless of the rebalanceing
> state of the store's task, the state will either be up to
> the requested bound or not.
>
> * StateStoreMigratedException: thrown when state store
> already closed and stream state is RUNNING. This is a per-
> partition failure, so it now maps to the
> FailureReason.NOT_PRESENT failure.
>
>
> * StateStoreNotAvailableException: thrown when state store
> closed and stream state is PENDING_SHUTDOWN / NOT_RUNNING /
> ERROR. I (subjectively) felt the name was ambiguous with
> respect to the prior condition in which a store partition is
> not locally available. This is replaced with the thrown
> exception, StreamsStoppedException (the JavaDoc states the
> that it is thrown when Streams is in any terminal state).
>
> * UnknownStateStoreException: thrown when passing an unknown
> state store. This is still a thown exception.
>
> * InvalidStateStorePartitionException: thrown when user
> requested partition is not available on the stream instance.
> If the partition actually does exist, then we will now
> return a per-partition FailureReason.NOT_PRESENT. If the
> requested partition is actually not present in the topology
> at all, then we will return the per-partition
> FailureReason.DOES_NOT_EXIST.
>
> Sorry for the oversight. The KIP has been updated.
>
> Thanks,
> -John
>
> On Wed, 2021-11-17 at 15:48 -0800, Guozhang Wang wrote:
> > Thanks John.
> >
> > I made another pass on the KIP and overall it already looks pretty good.
> I
> > just have a couple more minor comments:
> >
> > 13: What do you think about just removing the following function in
> > QueryResult
> >
> >   // returns a failed query result because caller requested a "latest"
> > bound, but the task was
> >   // not active and running.
> >   public static <R> QueryResult<R> notActive(String currentState);
> >
> > Instead just use `notUpToBound` for the case when `latest` bound is
> > requested but the node is not the active replica. My main motivation is
> > trying to abstract away the notion of active/standby from the public APIs
> > itself, and hence capturing both this case as well as just a
> > normal "position bound not achieved" in the same return signal, until
> later
> > when we think it is indeed needed to separate them with different
> returns.
> >
> > 14: Regarding the possible exceptions being thrown from `query`, it seems
> > more exception types are possible from KIP-216:
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-216%3A+IQ+should+throw+different+exceptions+for+different+errors
> ,
> > should we include all in the javadocs?
> >
> >
> > Guozhang
> >
> >
> >
> > On Wed, Nov 17, 2021 at 3:25 PM John Roesler <vv...@apache.org>
> wrote:
> >
> > > Thanks for the reply, Guozhang!
> > >
> > > I have updated the KIP to tie up the remaining points that
> > > we have discussed. I really appreciate your time in refining
> > > the proposal. I included a quick summary of the final state
> > > of our discussion points below.
> > >
> > > Since it seems like this discussion thread is pretty
> > > convergent, I'll go ahead and start the voting thread soon.
> > >
> > > Thanks again!
> > > -John
> > >
> > > P.S.: the final state of our discussion points:
> > >
> > > 1. I removed serdesForStore from the proposal (and moved it
> > > to Rejected Alternatives)
> > >
> > > 2. Thanks for that reference. I had overlooked that
> > > implementation. I'd note that the ListValuesStore is
> > > currently only used in the KStream API, which doesn't
> > > support queries at all. Due to its interface, it could
> > > theoretically be used to materialize a KTable, though it has
> > > no supplier provided in the typical Stores factory class.
> > >
> > > Regardless, I think that it would still be a similar story
> > > to the Segmented store. The ListValues store would simply
> > > choose to terminate the query on its own and not delegate to
> > > any of the wrapped KeyValue stores. It wouldn't matter that
> > > the wrapped stores have a query-handling facility of their
> > > own, if the wrapping store doesn't choose to delegate, the
> > > wrapped store will not try to execute any queries.
> > >
> > > Specifically regarding the key transformation that these
> > > "formatted" stores perform, when they handle the query, they
> > > would have the ability to execute the query in any way that
> > > makes sense OR to just reject the query if it doesn't make
> > > sense.
> > >
> > > 3, 4: nothing to do
> > >
> > > 5: I updated the KIP to specify the exceptions that may be
> > > thrown in `KafkaStreams#query` and to clarify that per-
> > > partition failures will be reported as per-partition failed
> > > QueryResult objects instead of thrown exceptions. That
> > > allows us to successfully serve some partitions of the
> > > request even if others fail.
> > >
> > > 6: I added a note that updating the metadata APIs is left
> > > for future work.
> > >
> > > 7: nothing to do
> > >
> > > 8. I went with StateQueryRequest and StateQueryResponse.
> > >
> > > 9, 10: nothing to do.
> > >
> > > 11: Ah, I see. That's a good point, but it's not fundamental
> > > to the framework. I think we can tackle it when we propose
> > > the actual queries.
> > >
> > > 12: Cool. I went ahead and dropped the "serdesForStore"
> > > method. I think you're onto something there, and we should
> > > tackle it separately when we propose the actual queries.
> > >
> > >
> > >
> > >
> > > On Tue, 2021-11-16 at 15:59 -0800, Guozhang Wang wrote:
> > > > Thanks John! Some more thoughts inlined below.
> > > >
> > > > On Mon, Nov 15, 2021 at 10:07 PM John Roesler <vv...@apache.org>
> > > wrote:
> > > >
> > > > > Thanks for the review, Guozhang!
> > > > >
> > > > > 1. This is a great point. I fell into the age-old trap of
> > > > > only considering the simplest store type and forgot about
> > > > > this extra wrinkle of the "key schema" that we use in
> > > > > Windowed and Session stores.
> > > > >
> > > > > Depending on how we want to forge forward with our provided
> > > > > queries, I think it can still work out ok. The simplest
> > > > > solution is just to have windowed versions of our queries
> > > > > for use on the windowed stores. That should work naively
> > > > > because we're basically just preserving the existing
> > > > > interactions. It might not be ideal in the long run, but at
> > > > > least it lets us make IQv2 orthogonal from other efforts to
> > > > > simplify the stores themselves.
> > > > >
> > > > > If we do that, then it would actually be correct to go ahead
> > > > > and just return the serdes that are present in the Metered
> > > > > stores today. For example, if I have a Windowed store with
> > > > > Integer keys, then the key serde I get from serdesForStore
> > > > > would just be the IntegerSerde. The query I'd use the
> > > > > serialized key with would be a RawWindowedKeyQuery, which
> > > > > takes a byte[] key and a timestamp. Then, the low-level
> > > > > store (the segmented store in this case) would have to take
> > > > > the next step to use its schema before making that last-mile
> > > > > query. Note, this is precisely how fetch is implemented
> > > > > today in RocksDBWindowStore:
> > > > >
> > > > > public byte[] fetch(final Bytes key, final long timestamp) {
> > > > >   return wrapped().get(WindowKeySchema.toStoreKeyBinary(key,
> > > > > timestamp, seqnum));
> > > > > }
> > > > >
> > > > > In other words, if we set up our provided Query types to
> > > > > stick close to the current store query methods, then
> > > > > everything "should work out" (tm).
> > > > >
> > > > > I think where things start to get more complicated would be
> > > > > if we wanted to expose the actual, raw, on-disk binary key
> > > > > to the user, along with a serde that can interpret it. Then,
> > > > > we would have to pack up the serde and the schema. If we go
> > > > > down that road, then knowing which one (the key serde or the
> > > > > windowed schema + the key serde) the user wants when they
> > > > > ask for "the serde" would be a challenge.
> > > > >
> > > > > I'm actually thinking maybe we don't need to include the
> > > > > serdesForStore method in this proposal, but instead leave it
> > > > > for follow-on work (if desired) to add it along with raw
> > > > > queries, since it's really only needed if you want raw
> > > > > queries and (as you mentioned later) there may be better
> > > > > ways to present the serdes, which is always easier to figure
> > > > > out once there's a use case.
> > > > >
> > > > >
> > > > > 2. Hmm, if I understand what you mean by the "formatted"
> > > > > layer, is that the one supplied by the
> > > > > WindowedBytesStoreSupplier or SessionBytesStoreSupplier in
> > > > > Materialized? I think the basic idea of this proposal is to
> > > > > let whatever store gets supplied there be the "last stop"
> > > > > for the query.
> > > > >
> > > > > For the case of our default windowed store, this is the
> > > > > segmented RocksDB store. It's true that this store "wraps" a
> > > > > bunch of segments, but it would be the segmented store's
> > > > > responsibility to handle the query, not defer to the
> > > > > segments. This might mean different things for different
> > > > > queries, but naively, I think it can just invoke to the
> > > > > current implementation of each of its methods.
> > > > >
> > > > > There might be future queries that require more
> > > > > sophisticated responses, but we should be able to add new
> > > > > queries for those, which have no restrictions on their
> > > > > response types. For example, we could choose to respond to a
> > > > > scan with a list of iterators, one for each segment.
> > > > >
> > > > >
> > > > For `formatted` stores, I also mean the ListValueStore which was
> added
> > > > recently for stream-stream joins, for example. The interface is a
> > > KV-store
> > > > but that disables same-key overwrites but instead keep all the
> values of
> > > > the same key as a list, and users can only delete old values by
> deleting
> > > > the whole key-list (which would of course delete new values as well).
> > > > ListValueStore uses a KeyValueStore as its inner, but would convert
> the
> > > put
> > > > call as append.
> > > >
> > > > I think in the long run, we should have a different interface other
> than
> > > > KVStore for this type, and the implementation would then be at the
> > > > `formatted` store layer. That means the `query` should be always
> > > > implemented at the inner layer of the logged store (that could be the
> > > most
> > > > `inner` store, or the `fomatted` store), and upper level wrapped
> stores
> > > > would be delegating to the inner stores.
> > > >
> > > > As for serdes, here's some more second thoughts: generally speaking,
> it's
> > > > always convenient for users to pass in the value as object than raw
> > > bytes,
> > > > the only exception is if the query is not for exact matching but
> prefix
> > > (or
> > > > suffix, though we do not have such cases today) matching, in which
> case
> > > we
> > > > would need the raw bytes in order to pass in the prefixed bytes into
> the
> > > > inner store. The returned value though could either be preferred as
> raw
> > > > bytes, or be deserialized already.
> > > >
> > > > The composite-serde mostly happens at the key, but not much at the
> value
> > > > (we only have "value-timestamp" type which needs a composite
> > > > deserialization, all others are direct values). So I'm feeling that a
> > > Query
> > > > would be best represented with non-serialized parameter (i.e.
> > > `KeyQuery<K,
> > > > V>`), while the query result be optionally raw or deserialized with
> the
> > > > serde class.
> > > >
> > > >
> > > > >
> > > > > 3. I agree the large switch (or if/else) (or Map) for query
> > > > > dispatch is a concern. That's the thing I'm most worried
> > > > > will become cumbersome. I think your idea is neat, though,
> > > > > because a lot of our surface area is providing a bunch of
> > > > > those different combinations of query attributes. I think if
> > > > > we get a little meta, we can actually fold it into the
> > > > > existing KIP.
> > > > >
> > > > > Rather than making Query any more restrictive, what we could
> > > > > do is to choose to follow your idea for the provided queries
> > > > > we ship with Streams. Although I had been thinking we would
> > > > > ship a KeyQuery, RangeQuery, etc., we could absolutely
> > > > > compactify those queries as much as possible so that there
> > > > > are only a few queries with those dimensions you listed.
> > > > >
> > > > > That way we can avoid blowing up the query space with our
> > > > > own provided queries, but we can still keep the framework as
> > > > > general as possible.
> > > > >
> > > > >
> > > > Sounds good!
> > > >
> > > >
> > > > > 4. I'm not sure, actually! I just thought it would be neat
> > > > > to have. I know I've spent my fair share of adding println
> > > > > statements to Streams or stepping though the debugger when
> > > > > something like that proposal would have done the job.
> > > > >
> > > > > So, I guess the answer is yes, I was just thinking of it as
> > > > > a debugging/informational tool. I also think that if we want
> > > > > to make it more structured in the future, we should be able
> > > > > to evolve that part of the API without and major problems.
> > > > >
> > > > >
> > > > > 5. That's another great point, and it's a miss on my part.
> > > > > The short answer is that we'd simply throw whatever runtime
> > > > > exceptions are appropriate, but I should and will document
> > > > > what they will be.
> > > > >
> > > > >
> > > > > 6. I do think those APIs need some attention, but I was
> > > > > actually hoping to treat that as a separate scope for design
> > > > > work later. I think that there shouldn't be any downside to
> > > > > tackling them as orthogonal, but I agree people will wonder
> > > > > about the relationship there, so I can update the KIP with
> > > > > some notes about it.
> > > > >
> > > > >
> > > > Thanks! I personally would consider that these APIs would eventually
> be
> > > > refactored as well as we stick with IQv2, and also the
> > > > `allLocalStorePartitionLags` would be deprecated with Position.
> > > >
> > > >
> > > > >
> > > > > 7. Yes, I've always been a bit on the fence about whether to
> > > > > bundle that in here. The only thing that made me keep it in
> > > > > is that we'd actually have to deprecate the newly proposed
> > > > > StateStore#query method if we want to add it in later. I.e.,
> > > > > we would just propose StateStore#query(query, executionInfo)
> > > > > right now, but then deprecate it and add
> > > > > StateStore#query(query, bound, executionInfo).
> > > > >
> > > > > Given that, it seems mildly better to just take the leap for
> > > > > now, and if it turns out we can't actually implement it
> > > > > nicely, then we can always drop it from the proposal after
> > > > > the fact.
> > > > >
> > > > > That said, if that aspect is going to derail this KIP's
> > > > > discussion, I think the lesser evil would indeed be to just
> > > > > drop it now. So far, it seems like there's been some small
> > > > > questions about it, but nothing that really takes us off
> > > > > course. So, if you don't object, I think I'd like to keep it
> > > > > in for a little while longer.
> > > > >
> > > > >
> > > > That's a fair point, let's keep it in this KIP then.
> > > >
> > > >
> > > > >
> > > > > 8. Sure, I like that idea. The names are a bit cumbersome.
> > > > >
> > > > > 9. I had them as separate types so that we could more easily
> > > > > inspect the query type. Otherwise, we'd just have to assume
> > > > > the generics' type is byte[] in the lower layer. I'm not
> > > > > sure that's the right call, but it also seems like the flip
> > > > > of a coin as to which is better.
> > > > >
> > > > > 10. The StateSerdes class that we have is internal. I used
> > > > > it in the POC to save time, but I gave it a different name
> > > > > in the KIP to make it clear that I'm proposing that we
> > > > > create a proper public interface and not just expose the
> > > > > internal one, which has a bunch of extra stuff in it.
> > > > >
> > > > > Then again, if I go ahead and drop the serdes from the
> > > > > propsoal entirely, we can worry about that another time!
> > > > >
> > > > >
> > > > > 11. I think I might have a typo somewhere, because I'm not
> > > > > following the question. The Query itself defines the result
> > > > > type <R>, QueryResult is just a container wrapping that R
> > > > > result as well as the execution info, etc. per partition.
> > > > >
> > > > > For a KeyQuery, its signature is:
> > > > >  KeyQuery<K, V> implements Query<V>
> > > > >
> > > > > So, when you use that query, it does bind R to V, and the
> > > > > result will be a QueryResult<V>.
> > > > >
> > > > >
> > > > Cool thanks. My main confusion comes from the inconsistency of
> key-query
> > > > and scan-query. The former implements Query as:
> > > >
> > > > KeyQuery<K, V> implements Query<V>:  => binds V to R, and K unbound
> > > >
> > > > Whereas the latter implements as:
> > > >
> > > > ScanQuery<K, V> implements Query<KeyValueIterator<K, V>>: => binds
> > > > KeyValueIterator<?, ?> to R, whereas K/V both unbound
> > > >
> > > >
> > > >
> > > > >
> > > > > 12. I considered doing exactly that. The reason I shied away
> > > > > from it in general is that if you're going to have a "raw"
> > > > > query API, you also need to know the key serde before you do
> > > > > a query (otherwise you can't query at all!). So, bundling a
> > > > > serde with the response only really applies to the value.
> > > > >
> > > > >
> > > > See the other comment above: my thinking is actually that, for Query
> we
> > > > would, potentially always, prefer to have it as in deserialized
> object
> > > > format (except for partial match, which we can discuss separately),
> we
> > > only
> > > > need to consider whether the QueryResult should be in raw or in
> > > > deserialized format.
> > > >
> > > >
> > > > > It still might be a good idea, but since I was thinking I
> > > > > already needed a separate discovery method for the key
> > > > > serde, then I might as well just keep the key and value
> > > > > serdes together, rather than bundling the value serde with
> > > > > each value.
> > > > >
> > > > > I do think it would be neat to have queries that don't
> > > > > deserialize the value by default and give you the option to
> > > > > do it on demand, or maybe just de-structure some parts of
> > > > > the value out (eg just reading the timestamp without
> > > > > deserializing the rest of the value). But, now that I've
> > > > > started to think about dropping the "raw" query design from
> > > > > the scope of this KIP, I'm wondering if we can just consider
> > > > > this use case later. It does seem plausible that we could
> > > > > choose to bundle the serdes with the values for those
> > > > > queries without needing a change in this KIP's framework, at
> > > > > least.
> > > > >
> > > > >
> > > > > Whew! Thanks again for the great thoughts. I'll make the
> > > > > changes I mentioned tomorrow. Please let me know if you
> > > > > disagree with any of my responses!
> > > > >
> > > > > Thanks,
> > > > > -John
> > > > >
> > > > > On Mon, 2021-11-15 at 17:29 -0800, Guozhang Wang wrote:
> > > > > > Hello John,
> > > > > >
> > > > > > Great, great, great writeup! :) And thank you for bringing this
> up
> > > > > finally.
> > > > > > I have made a pass on the KIP as well as the POC PR of it, here
> are
> > > some
> > > > > > initial thoughts:
> > > > > >
> > > > > > First are some meta ones:
> > > > > >
> > > > > > 1. Today the serdes do not only happen at the metered-store
> layer,
> > > > > > unfortunately. For windowed / sessioned stores, and also some
> newly
> > > added
> > > > > > ones for stream-stream joins that are optimized for time-based
> range
> > > > > > queries, for example, the serdes are actually composite at
> multiple
> > > > > layers.
> > > > > > And the queries on the outer interface are also translated with
> serde
> > > > > > wrapped / stripped along the way in layers. To be more specific,
> > > today
> > > > > our
> > > > > > store hierarchy is like this:
> > > > > >
> > > > > > metered * -> cached -> logged * -> formatted * (e.g. segmenged,
> > > > > > list-valued) -> inner (rocksdb, in-memory)
> > > > > >
> > > > > > and serdes today could happen on the layers with * above, where
> each
> > > > > layer
> > > > > > is stuffing a bit more as prefix/suffix into the query bytes.
> This
> > > is not
> > > > > > really by design or ideal, but a result of history accumulated
> tech
> > > > > debts..
> > > > > > There's a related JIRA ticket for it:
> > > > > > https://issues.apache.org/jira/browse/KAFKA-13286. I guess my
> point
> > > is
> > > > > that
> > > > > > we need to be a bit careful regarding how to implement the
> > > > > > `KafkaStreams#serdesForStore(storeName)`, as we may expect some
> bumpy
> > > > > roads
> > > > > > moving forward.
> > > > > >
> > > > > > 2. Related to 1 above, I think we cannot always delegate the
> > > `query()`
> > > > > > implementation to the `inner` store layer, since some serde, or
> even
> > > some
> > > > > > computation logic happens at the outer, especially the
> `formatted`
> > > layer.
> > > > > > For example, besides the cached layer, the `formatted` layer also
> > > needs
> > > > > to
> > > > > > make sure the `query` object is being appropriately translated
> > > > > beforeMaterialized
> > > > > > handing it off downstreams to the inner store, and also need to
> > > translate
> > > > > > the `queryResult` a bit while handing it upwards in the
> hierarchy.
> > > > > >
> > > > > > 3. As we add more query types in the IQv2, the inner store's
> `query`
> > > > > > instantiation may be getting very clumsy with a large "switch"
> > > condition
> > > > > on
> > > > > > all the possible query types. Although custom stores could
> consider
> > > only
> > > > > > supporting a few, having the `default` case to ignore all others,
> > > > > built-in
> > > > > > stores may still need to exhaust all possible types. I'm
> wondering if
> > > > > it's
> > > > > > a good trade-off to make `Query` be more restricted on
> extensibility
> > > to
> > > > > > have less exploding query type space, e.g. if a Query can only be
> > > > > extended
> > > > > > with some predefined dimensions like:
> > > > > >
> > > > > > * query-field: key, non-key (some field extractor from the value
> > > bytes
> > > > > need
> > > > > > to be provided)
> > > > > > * query-scope: single, range
> > > > > > * query-match-type (only be useful for a range scope):
> prefix-match
> > > (e.g.
> > > > > > for a range key query, the provided is only a prefix, and all
> keys
> > > > > > containing this prefix should be returned), full-match
> > > > > > * query-value-type: object, raw-bytes
> > > > > >
> > > > > > 4. What's the expected usage for the execution info? Is it only
> for
> > > > > logging
> > > > > > purposes? If yes then I think not enforcing any string format is
> > > fine,
> > > > > that
> > > > > > the store layers can just attach any string information that they
> > > feel
> > > > > > useful.
> > > > > >
> > > > > > 5. I do not find any specific proposals for exception handling,
> what
> > > > > would
> > > > > > that look like? E.g. besides all the expected error cases like
> > > > > non-active,
> > > > > > how would we communicate other unexpected error cases such as
> store
> > > > > closed,
> > > > > > IO error, bad query parameters, etc?
> > > > > >
> > > > > > 6. Since we do not deprecate any existing APIs in this KIP, it's
> a
> > > bit
> > > > > hard
> > > > > > for readers to understand what is eventually going to be covered
> by
> > > IQv2.
> > > > > > For example, we know that eventually `KafkaStreams#store` would
> be
> > > gone,
> > > > > > but what about `KafkaStreams#queryMetadataForKey`, and
> > > > > > `#streamsMetadataForStore`, and also
> `allLocalStorePartitionLags`? I
> > > > > think
> > > > > > it would be great to mention the end world state with IQv2 even
> if
> > > the
> > > > > KIP
> > > > > > itself would not deprecate anything yet.
> > > > > >
> > > > > > 7. It seems people are still a bit confused about the
> > > > > > "Position/PositionBound" topics, and personally I think it's
> okay to
> > > > > > exclude them in this KIP just to keep its (already large) scope
> > > smaller.
> > > > > > Also after we started implementing the KIP in full, we may have
> > > learned
> > > > > new
> > > > > > things while fighting the details in the weeds, and that would
> be a
> > > > > better
> > > > > > timing for us to consider new parameters such as bounds, but also
> > > caching
> > > > > > bypassing, and other potential features as well.
> > > > > >
> > > > > > Some minor ones:
> > > > > >
> > > > > > 8. What about just naming the new classes as
> > > `StateQueryRequest/Result`,
> > > > > or
> > > > > > `StoreQueryRequest/Result`? The word "interactive" is for
> describing
> > > its
> > > > > > semantics in docs, but I feel for class names we can use a more
> > > > > meaningful
> > > > > > prefix.
> > > > > >
> > > > > > 9. Should the RawKeyQuery be extending `KeyQuery<byte[]>`, or
> > > directly
> > > > > > implementing `Query<byte[]`>?
> > > > > >
> > > > > > 10. Why do we need the new class "InteractiveQuerySerdes" along
> with
> > > > > > existing classes? In your PR it seems just using `StateSerdes`
> > > directly.
> > > > > >
> > > > > > 11. Why do we have a new template type "R" in the QueryResult
> class
> > > in
> > > > > > addition to "<K, V>"? Should R always be equal to V?
> > > > > >
> > > > > > 12. Related to 10/11 above, what about letting the QueryResult to
> > > always
> > > > > be
> > > > > > returning values in raw bytes, along with the serdes? And then
> it's
> > > up to
> > > > > > the callers whether they want the bytes to be immediately
> > > deserialized or
> > > > > > want them to be written somewhere and deserialized later? More
> > > > > specifically
> > > > > > we would only have a single function as KafkaStreams#query, and
> the
> > > > > > QueryResult would be:
> > > > > >
> > > > > > InteractiveQueryResult {
> > > > > >   public InteractiveQueryResult(Map<Integer /*partition*/,
> > > > > > QueryResult<byte[]>> partitionResults);
> > > > > >
> > > > > > ...
> > > > > >
> > > > > >   public StateSerdes<K, V> serdes();
> > > > > > }
> > > > > >
> > > > > > And then the result itself can also provide some built-in
> functions
> > > to do
> > > > > > the deser upon returning results, so that user's code would not
> get
> > > more
> > > > > > complicated. The benefit is that we end up with a single
> function in
> > > > > > `KafkaStreams`, and the inner store always only need to implement
> > > the raw
> > > > > > query types. Of course doing this would not be so easy given the
> fact
> > > > > > described in 1) above, but I feel this would be a good way to
> first
> > > > > > abstract away this tech debt, and then later resolve it to a
> single
> > > > > place.
> > > > > >
> > > > > > ---------------
> > > > > >
> > > > > > Again, congrats on the very nice proposal! Let me know what you
> think
> > > > > about
> > > > > > my comments.
> > > > > >
> > > > > > Guozhang
> > > > > >
> > > > > >
> > > > > > On Mon, Nov 15, 2021 at 2:52 PM John Roesler <
> vvcephei@apache.org>
> > > > > wrote:
> > > > > >
> > > > > > > Hi Patrick and Sagar,
> > > > > > >
> > > > > > > Thanks for the feedback! I'll just break out the questions
> > > > > > > and address them one at a time.
> > > > > > >
> > > > > > > Patrick 1.
> > > > > > > The default bound that I'm proposing is only to let active
> > > > > > > tasks answer queries (which is also the default with IQ
> > > > > > > today). Therefore, calling getPositionBound() would return a
> > > > > > > PositionBound for which isLatest() is true.
> > > > > > >
> > > > > > > Patrick 2.
> > > > > > > I might have missed something in revision, but I'm not sure
> > > > > > > what you're referring to exactly when you say they are
> > > > > > > different. The IQRequest only has a PositionBound, and the
> > > > > > > IQResponse only has a (concrete) Position, so I think they
> > > > > > > are named accordingly (getPositionBound and getPosition). Am
> > > > > > > I overlooking what you are talking about?
> > > > > > >
> > > > > > > Sagar 1.
> > > > > > > I think you're talking about the KeyValueStore#get(key)
> > > > > > > method? This is a really good question. I went ahead and
> > > > > > > dropped in an addendum to the KeyQuery example to show how
> > > > > > > you would run the query in today's API. Here's a caracature
> > > > > > > of the two APIS:
> > > > > > >
> > > > > > > current:
> > > > > > >   KeyValueStore store = kafkaStreams.store(
> > > > > > >     "mystore",
> > > > > > >     keyValueStore())
> > > > > > >   int value = store.get(key);
> > > > > > >
> > > > > > > proposed:
> > > > > > >   int value = kafkaStreams.query(
> > > > > > >     "mystore",
> > > > > > >     KeyQuery.withKey(key));
> > > > > > >
> > > > > > > So, today we first get the store interface and then we
> > > > > > > invoke the method, and under the proposal, we would instead
> > > > > > > just ask KafkaStreams to execute the query on the store. In
> > > > > > > addition to all the other stuff I said in the motivation,
> > > > > > > one thing I think is neat about this API is that it means we
> > > > > > > can re-use queries across stores. So, for example, we could
> > > > > > > also use KeyQuery on WindowStores, even though there's no
> > > > > > > common interface between WindowStore and KeyValueStore.
> > > > > > >
> > > > > > > In other words, stores can support any queries that make
> > > > > > > sense and _not_ support any queries that don't make sense.
> > > > > > > This gets into your second question...
> > > > > > >
> > > > > > > Sagar 2.
> > > > > > > Very good question. Your experience with your KIP-614
> > > > > > > contribution was one of the things that made me want to
> > > > > > > revise IQ to begin with. It seems like there's a really
> > > > > > > stark gap between how straightforward the proposal is to add
> > > > > > > a new store operation, and then how hard it is to actually
> > > > > > > implement a new operation, due to all those intervening
> > > > > > > wrappers.
> > > > > > >
> > > > > > > There are two categories of wrappers to worry about:
> > > > > > > - Facades: These only exist to disallow access to write
> > > > > > > APIs, which are exposed through IQ today but shouldn't be
> > > > > > > called. These are simply unnecessary under IQv2, since we
> > > > > > > only run queries instead of returning the whole store.
> > > > > > > - Store Layers: This is what you provided examples of. We
> > > > > > > have store layers that let us compose features like
> > > > > > > de/serialization and metering, changelogging, caching, etc.
> > > > > > > A nice thing about this design is that we mostly don't have
> > > > > > > to worry at all about those wrapper layers at all. Each of
> > > > > > > these stores would simply delegate any query to lower layers
> > > > > > > unless there is something they need to do. In my POC, I
> > > > > > > simply added a delegating implementation to
> > > > > > > WrappedStateStore, which meant that I didn't need to touch
> > > > > > > most of the wrappers when I added a new query.
> > > > > > >
> > > > > > > Here's what I think future contributors will have to worry
> > > > > > > about:
> > > > > > > 1. The basic query execution in the base byte stores
> > > > > > > (RocksDB and InMemory)
> > > > > > > 2. The Caching stores IF they want the query to be served
> > > > > > > from the cache
> > > > > > > 3. The Metered stores IF some serialization needs to be done
> > > > > > > for the query
> > > > > > >
> > > > > > > And that's it! We should be able to add new queries without
> > > > > > > touching any other store layer besides those, and each one
> > > > > > > of those is involved because it has some specific reason to
> > > > > > > be.
> > > > > > >
> > > > > > >
> > > > > > > Thanks again, Patrick and Sagar! Please let me know if I
> > > > > > > failed to address your questions, or if you have any more.
> > > > > > >
> > > > > > > Thanks,
> > > > > > > -John
> > > > > > >
> > > > > > > On Mon, 2021-11-15 at 22:37 +0530, Sagar wrote:
> > > > > > > > Hi John,
> > > > > > > >
> > > > > > > > Thanks for the great writeup! Couple of things I wanted to
> bring
> > > > > up(may
> > > > > > > or
> > > > > > > > mayn't be relevant):
> > > > > > > >
> > > > > > > > 1) The sample implementation that you have presented for
> > > KeyQuery is
> > > > > very
> > > > > > > > helpful. One thing which may be added to it is how it
> connects
> > > to the
> > > > > > > > KeyValue.get(key) method. That's something that atleast I
> > > couldn't
> > > > > > > totally
> > > > > > > > figure out-not sure about others though. I understand that
> it is
> > > out
> > > > > of
> > > > > > > > scope of th KIP to explain for every query that IQ supports
> but
> > > one
> > > > > > > > implementation just to get a sense of how the changes would
> feel
> > > > > like.
> > > > > > > > 2) The other thing that I wanted to know is that StateStore
> on
> > > it's
> > > > > own
> > > > > > > has
> > > > > > > > a lot of implementations and some of them are wrappers, So at
> > > what
> > > > > levels
> > > > > > > > do users need to implement the query methods? Like a
> > > > > MeteredKeyValueStore
> > > > > > > > wraps RocksDbStore and calls it internally through a wrapped
> > > call.
> > > > > As per
> > > > > > > > the new changes, how would the scheme of things look like
> for the
> > > > > same
> > > > > > > > KeyQuery?
> > > > > > > >
> > > > > > > > Thanks!
> > > > > > > > Sagar.
> > > > > > > >
> > > > > > > >
> > > > > > > > On Mon, Nov 15, 2021 at 6:20 PM Patrick Stuedi
> > > > > > > <ps...@confluent.io.invalid>
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi John,
> > > > > > > > >
> > > > > > > > > Thanks for submitting the KIP! One question I have is,
> > > assuming one
> > > > > > > > > instantiates InteractiveQueryRequest via withQuery, and
> then
> > > later
> > > > > > > calls
> > > > > > > > > getPositionBound, what will the result be? Also I noticed
> the
> > > > > Position
> > > > > > > > > returning method is in InteractiveQueryRequest and
> > > > > > > InteractiveQueryResult
> > > > > > > > > is named differently, any particular reason?
> > > > > > > > >
> > > > > > > > > Best,
> > > > > > > > >   Patrick
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Fri, Nov 12, 2021 at 12:29 AM John Roesler <
> > > vvcephei@apache.org
> > > > > >
> > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Thanks for taking a look, Sophie!
> > > > > > > > > >
> > > > > > > > > > Ah, that was a revision error. I had initially been
> planning
> > > > > > > > > > an Optional<Set<Integer>> with Optional.empty() meaning
> to
> > > > > > > > > > fetch all partitions, but then decided it was needlessly
> > > > > > > > > > complex and changed it to the current proposal with two
> > > > > > > > > > methods:
> > > > > > > > > >
> > > > > > > > > > boolean isAllPartitions();
> > > > > > > > > > Set<Integer> getPartitions(); (which would throw an
> > > > > > > > > > exception if it's an "all partitions" request).
> > > > > > > > > >
> > > > > > > > > > I've corrected the javadoc and also documented the
> > > > > > > > > > exception.
> > > > > > > > > >
> > > > > > > > > > Thanks!
> > > > > > > > > > -John
> > > > > > > > > >
> > > > > > > > > > On Thu, 2021-11-11 at 15:03 -0800, Sophie Blee-Goldman
> > > > > > > > > > wrote:
> > > > > > > > > > > Thanks John, I've been looking forward to this for a
> while
> > > > > now. It
> > > > > > > > > > > was pretty horrifying to learn
> > > > > > > > > > > how present-day IQ works  (or rather, doesn't work)
> with
> > > custom
> > > > > > > state
> > > > > > > > > > > stores :/
> > > > > > > > > > >
> > > > > > > > > > > One minor cosmetic point, In the
> InteractiveQueryRequest
> > > class,
> > > > > > > the #
> > > > > > > > > > > getPartitions
> > > > > > > > > > > method has a return type of Set<Integer>, but the
> javadocs
> > > > > refer to
> > > > > > > > > > Optional.
> > > > > > > > > > > Not
> > > > > > > > > > > sure which is intended for this API, but if is
> supposed to
> > > be
> > > > > the
> > > > > > > > > return
> > > > > > > > > > > type, do you perhaps
> > > > > > > > > > > mean for it to be  Optional.ofEmpty() and
> > > Optional.of(non-empty
> > > > > > > set)
> > > > > > > > > > > rather than Optional.of(empty set) and
> > > Optional.of(non-empty
> > > > > set) ?
> > > > > > > > > > >
> > > > > > > > > > > On Thu, Nov 11, 2021 at 12:03 PM John Roesler <
> > > > > vvcephei@apache.org
> > > > > > > >
> > > > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > Hello again, all,
> > > > > > > > > > > >
> > > > > > > > > > > > Just bumping this discussion on a new, more flexible
> > > > > > > > > > > > Interactive Query API in Kafka Streams.
> > > > > > > > > > > >
> > > > > > > > > > > > If there are no concerns, I'll go ahead and call a
> vote
> > > on
> > > > > > > > > > > > Monday.
> > > > > > > > > > > >
> > > > > > > > > > > > Thanks!
> > > > > > > > > > > > -John
> > > > > > > > > > > >
> > > > > > > > > > > > On Tue, 2021-11-09 at 17:37 -0600, John Roesler
> wrote:
> > > > > > > > > > > > > Hello all,
> > > > > > > > > > > > >
> > > > > > > > > > > > > I'd like to start the discussion for KIP-796, which
> > > > > proposes
> > > > > > > > > > > > > a revamp of the Interactive Query APIs in Kafka
> > > Streams.
> > > > > > > > > > > > >
> > > > > > > > > > > > > The proposal is here:
> > > > > > > > > > > > > https://cwiki.apache.org/confluence/x/34xnCw
> > > > > > > > > > > > >
> > > > > > > > > > > > > I look forward to your feedback!
> > > > > > > > > > > > >
> > > > > > > > > > > > > Thank you,
> > > > > > > > > > > > > -John
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > > >
> > > >
> > >
> > >
> >
>
>

-- 
-- Guozhang

Re: [DISCUSS] KIP-796: Interactive Query v2

Posted by John Roesler <vv...@apache.org>.
Ah, sorry, Guozhang,

It seem I was a bit too eager with starting the vote thread.

13: I think that makes perfect sense. I've updated the KIP.

14: Oof, I can't believe I overlooked those newer
exceptions. Some of them will become exceptions in IQv2,
whereas others will beceome individual partition QueryResult
failures. Here is an accounting of what will become of those
proposed exceptions:

* StreamsNotStartedException: thrown when stream thread
state is CREATED, the user can retry until to RUNNING.

* StreamsRebalancingException: thrown when stream thread is
not running and stream state is REBALANCING. This exception
is no longer applicable. Regardless of the rebalanceing
state of the store's task, the state will either be up to
the requested bound or not.

* StateStoreMigratedException: thrown when state store
already closed and stream state is RUNNING. This is a per-
partition failure, so it now maps to the
FailureReason.NOT_PRESENT failure.


* StateStoreNotAvailableException: thrown when state store
closed and stream state is PENDING_SHUTDOWN / NOT_RUNNING / 
ERROR. I (subjectively) felt the name was ambiguous with
respect to the prior condition in which a store partition is
not locally available. This is replaced with the thrown
exception, StreamsStoppedException (the JavaDoc states the
that it is thrown when Streams is in any terminal state). 

* UnknownStateStoreException: thrown when passing an unknown
state store. This is still a thown exception.

* InvalidStateStorePartitionException: thrown when user
requested partition is not available on the stream instance.
If the partition actually does exist, then we will now
return a per-partition FailureReason.NOT_PRESENT. If the
requested partition is actually not present in the topology
at all, then we will return the per-partition
FailureReason.DOES_NOT_EXIST.

Sorry for the oversight. The KIP has been updated.

Thanks,
-John

On Wed, 2021-11-17 at 15:48 -0800, Guozhang Wang wrote:
> Thanks John.
> 
> I made another pass on the KIP and overall it already looks pretty good. I
> just have a couple more minor comments:
> 
> 13: What do you think about just removing the following function in
> QueryResult
> 
>   // returns a failed query result because caller requested a "latest"
> bound, but the task was
>   // not active and running.
>   public static <R> QueryResult<R> notActive(String currentState);
> 
> Instead just use `notUpToBound` for the case when `latest` bound is
> requested but the node is not the active replica. My main motivation is
> trying to abstract away the notion of active/standby from the public APIs
> itself, and hence capturing both this case as well as just a
> normal "position bound not achieved" in the same return signal, until later
> when we think it is indeed needed to separate them with different returns.
> 
> 14: Regarding the possible exceptions being thrown from `query`, it seems
> more exception types are possible from KIP-216:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-216%3A+IQ+should+throw+different+exceptions+for+different+errors,
> should we include all in the javadocs?
> 
> 
> Guozhang
> 
> 
> 
> On Wed, Nov 17, 2021 at 3:25 PM John Roesler <vv...@apache.org> wrote:
> 
> > Thanks for the reply, Guozhang!
> > 
> > I have updated the KIP to tie up the remaining points that
> > we have discussed. I really appreciate your time in refining
> > the proposal. I included a quick summary of the final state
> > of our discussion points below.
> > 
> > Since it seems like this discussion thread is pretty
> > convergent, I'll go ahead and start the voting thread soon.
> > 
> > Thanks again!
> > -John
> > 
> > P.S.: the final state of our discussion points:
> > 
> > 1. I removed serdesForStore from the proposal (and moved it
> > to Rejected Alternatives)
> > 
> > 2. Thanks for that reference. I had overlooked that
> > implementation. I'd note that the ListValuesStore is
> > currently only used in the KStream API, which doesn't
> > support queries at all. Due to its interface, it could
> > theoretically be used to materialize a KTable, though it has
> > no supplier provided in the typical Stores factory class.
> > 
> > Regardless, I think that it would still be a similar story
> > to the Segmented store. The ListValues store would simply
> > choose to terminate the query on its own and not delegate to
> > any of the wrapped KeyValue stores. It wouldn't matter that
> > the wrapped stores have a query-handling facility of their
> > own, if the wrapping store doesn't choose to delegate, the
> > wrapped store will not try to execute any queries.
> > 
> > Specifically regarding the key transformation that these
> > "formatted" stores perform, when they handle the query, they
> > would have the ability to execute the query in any way that
> > makes sense OR to just reject the query if it doesn't make
> > sense.
> > 
> > 3, 4: nothing to do
> > 
> > 5: I updated the KIP to specify the exceptions that may be
> > thrown in `KafkaStreams#query` and to clarify that per-
> > partition failures will be reported as per-partition failed
> > QueryResult objects instead of thrown exceptions. That
> > allows us to successfully serve some partitions of the
> > request even if others fail.
> > 
> > 6: I added a note that updating the metadata APIs is left
> > for future work.
> > 
> > 7: nothing to do
> > 
> > 8. I went with StateQueryRequest and StateQueryResponse.
> > 
> > 9, 10: nothing to do.
> > 
> > 11: Ah, I see. That's a good point, but it's not fundamental
> > to the framework. I think we can tackle it when we propose
> > the actual queries.
> > 
> > 12: Cool. I went ahead and dropped the "serdesForStore"
> > method. I think you're onto something there, and we should
> > tackle it separately when we propose the actual queries.
> > 
> > 
> > 
> > 
> > On Tue, 2021-11-16 at 15:59 -0800, Guozhang Wang wrote:
> > > Thanks John! Some more thoughts inlined below.
> > > 
> > > On Mon, Nov 15, 2021 at 10:07 PM John Roesler <vv...@apache.org>
> > wrote:
> > > 
> > > > Thanks for the review, Guozhang!
> > > > 
> > > > 1. This is a great point. I fell into the age-old trap of
> > > > only considering the simplest store type and forgot about
> > > > this extra wrinkle of the "key schema" that we use in
> > > > Windowed and Session stores.
> > > > 
> > > > Depending on how we want to forge forward with our provided
> > > > queries, I think it can still work out ok. The simplest
> > > > solution is just to have windowed versions of our queries
> > > > for use on the windowed stores. That should work naively
> > > > because we're basically just preserving the existing
> > > > interactions. It might not be ideal in the long run, but at
> > > > least it lets us make IQv2 orthogonal from other efforts to
> > > > simplify the stores themselves.
> > > > 
> > > > If we do that, then it would actually be correct to go ahead
> > > > and just return the serdes that are present in the Metered
> > > > stores today. For example, if I have a Windowed store with
> > > > Integer keys, then the key serde I get from serdesForStore
> > > > would just be the IntegerSerde. The query I'd use the
> > > > serialized key with would be a RawWindowedKeyQuery, which
> > > > takes a byte[] key and a timestamp. Then, the low-level
> > > > store (the segmented store in this case) would have to take
> > > > the next step to use its schema before making that last-mile
> > > > query. Note, this is precisely how fetch is implemented
> > > > today in RocksDBWindowStore:
> > > > 
> > > > public byte[] fetch(final Bytes key, final long timestamp) {
> > > >   return wrapped().get(WindowKeySchema.toStoreKeyBinary(key,
> > > > timestamp, seqnum));
> > > > }
> > > > 
> > > > In other words, if we set up our provided Query types to
> > > > stick close to the current store query methods, then
> > > > everything "should work out" (tm).
> > > > 
> > > > I think where things start to get more complicated would be
> > > > if we wanted to expose the actual, raw, on-disk binary key
> > > > to the user, along with a serde that can interpret it. Then,
> > > > we would have to pack up the serde and the schema. If we go
> > > > down that road, then knowing which one (the key serde or the
> > > > windowed schema + the key serde) the user wants when they
> > > > ask for "the serde" would be a challenge.
> > > > 
> > > > I'm actually thinking maybe we don't need to include the
> > > > serdesForStore method in this proposal, but instead leave it
> > > > for follow-on work (if desired) to add it along with raw
> > > > queries, since it's really only needed if you want raw
> > > > queries and (as you mentioned later) there may be better
> > > > ways to present the serdes, which is always easier to figure
> > > > out once there's a use case.
> > > > 
> > > > 
> > > > 2. Hmm, if I understand what you mean by the "formatted"
> > > > layer, is that the one supplied by the
> > > > WindowedBytesStoreSupplier or SessionBytesStoreSupplier in
> > > > Materialized? I think the basic idea of this proposal is to
> > > > let whatever store gets supplied there be the "last stop"
> > > > for the query.
> > > > 
> > > > For the case of our default windowed store, this is the
> > > > segmented RocksDB store. It's true that this store "wraps" a
> > > > bunch of segments, but it would be the segmented store's
> > > > responsibility to handle the query, not defer to the
> > > > segments. This might mean different things for different
> > > > queries, but naively, I think it can just invoke to the
> > > > current implementation of each of its methods.
> > > > 
> > > > There might be future queries that require more
> > > > sophisticated responses, but we should be able to add new
> > > > queries for those, which have no restrictions on their
> > > > response types. For example, we could choose to respond to a
> > > > scan with a list of iterators, one for each segment.
> > > > 
> > > > 
> > > For `formatted` stores, I also mean the ListValueStore which was added
> > > recently for stream-stream joins, for example. The interface is a
> > KV-store
> > > but that disables same-key overwrites but instead keep all the values of
> > > the same key as a list, and users can only delete old values by deleting
> > > the whole key-list (which would of course delete new values as well).
> > > ListValueStore uses a KeyValueStore as its inner, but would convert the
> > put
> > > call as append.
> > > 
> > > I think in the long run, we should have a different interface other than
> > > KVStore for this type, and the implementation would then be at the
> > > `formatted` store layer. That means the `query` should be always
> > > implemented at the inner layer of the logged store (that could be the
> > most
> > > `inner` store, or the `fomatted` store), and upper level wrapped stores
> > > would be delegating to the inner stores.
> > > 
> > > As for serdes, here's some more second thoughts: generally speaking, it's
> > > always convenient for users to pass in the value as object than raw
> > bytes,
> > > the only exception is if the query is not for exact matching but prefix
> > (or
> > > suffix, though we do not have such cases today) matching, in which case
> > we
> > > would need the raw bytes in order to pass in the prefixed bytes into the
> > > inner store. The returned value though could either be preferred as raw
> > > bytes, or be deserialized already.
> > > 
> > > The composite-serde mostly happens at the key, but not much at the value
> > > (we only have "value-timestamp" type which needs a composite
> > > deserialization, all others are direct values). So I'm feeling that a
> > Query
> > > would be best represented with non-serialized parameter (i.e.
> > `KeyQuery<K,
> > > V>`), while the query result be optionally raw or deserialized with the
> > > serde class.
> > > 
> > > 
> > > > 
> > > > 3. I agree the large switch (or if/else) (or Map) for query
> > > > dispatch is a concern. That's the thing I'm most worried
> > > > will become cumbersome. I think your idea is neat, though,
> > > > because a lot of our surface area is providing a bunch of
> > > > those different combinations of query attributes. I think if
> > > > we get a little meta, we can actually fold it into the
> > > > existing KIP.
> > > > 
> > > > Rather than making Query any more restrictive, what we could
> > > > do is to choose to follow your idea for the provided queries
> > > > we ship with Streams. Although I had been thinking we would
> > > > ship a KeyQuery, RangeQuery, etc., we could absolutely
> > > > compactify those queries as much as possible so that there
> > > > are only a few queries with those dimensions you listed.
> > > > 
> > > > That way we can avoid blowing up the query space with our
> > > > own provided queries, but we can still keep the framework as
> > > > general as possible.
> > > > 
> > > > 
> > > Sounds good!
> > > 
> > > 
> > > > 4. I'm not sure, actually! I just thought it would be neat
> > > > to have. I know I've spent my fair share of adding println
> > > > statements to Streams or stepping though the debugger when
> > > > something like that proposal would have done the job.
> > > > 
> > > > So, I guess the answer is yes, I was just thinking of it as
> > > > a debugging/informational tool. I also think that if we want
> > > > to make it more structured in the future, we should be able
> > > > to evolve that part of the API without and major problems.
> > > > 
> > > > 
> > > > 5. That's another great point, and it's a miss on my part.
> > > > The short answer is that we'd simply throw whatever runtime
> > > > exceptions are appropriate, but I should and will document
> > > > what they will be.
> > > > 
> > > > 
> > > > 6. I do think those APIs need some attention, but I was
> > > > actually hoping to treat that as a separate scope for design
> > > > work later. I think that there shouldn't be any downside to
> > > > tackling them as orthogonal, but I agree people will wonder
> > > > about the relationship there, so I can update the KIP with
> > > > some notes about it.
> > > > 
> > > > 
> > > Thanks! I personally would consider that these APIs would eventually be
> > > refactored as well as we stick with IQv2, and also the
> > > `allLocalStorePartitionLags` would be deprecated with Position.
> > > 
> > > 
> > > > 
> > > > 7. Yes, I've always been a bit on the fence about whether to
> > > > bundle that in here. The only thing that made me keep it in
> > > > is that we'd actually have to deprecate the newly proposed
> > > > StateStore#query method if we want to add it in later. I.e.,
> > > > we would just propose StateStore#query(query, executionInfo)
> > > > right now, but then deprecate it and add
> > > > StateStore#query(query, bound, executionInfo).
> > > > 
> > > > Given that, it seems mildly better to just take the leap for
> > > > now, and if it turns out we can't actually implement it
> > > > nicely, then we can always drop it from the proposal after
> > > > the fact.
> > > > 
> > > > That said, if that aspect is going to derail this KIP's
> > > > discussion, I think the lesser evil would indeed be to just
> > > > drop it now. So far, it seems like there's been some small
> > > > questions about it, but nothing that really takes us off
> > > > course. So, if you don't object, I think I'd like to keep it
> > > > in for a little while longer.
> > > > 
> > > > 
> > > That's a fair point, let's keep it in this KIP then.
> > > 
> > > 
> > > > 
> > > > 8. Sure, I like that idea. The names are a bit cumbersome.
> > > > 
> > > > 9. I had them as separate types so that we could more easily
> > > > inspect the query type. Otherwise, we'd just have to assume
> > > > the generics' type is byte[] in the lower layer. I'm not
> > > > sure that's the right call, but it also seems like the flip
> > > > of a coin as to which is better.
> > > > 
> > > > 10. The StateSerdes class that we have is internal. I used
> > > > it in the POC to save time, but I gave it a different name
> > > > in the KIP to make it clear that I'm proposing that we
> > > > create a proper public interface and not just expose the
> > > > internal one, which has a bunch of extra stuff in it.
> > > > 
> > > > Then again, if I go ahead and drop the serdes from the
> > > > propsoal entirely, we can worry about that another time!
> > > > 
> > > > 
> > > > 11. I think I might have a typo somewhere, because I'm not
> > > > following the question. The Query itself defines the result
> > > > type <R>, QueryResult is just a container wrapping that R
> > > > result as well as the execution info, etc. per partition.
> > > > 
> > > > For a KeyQuery, its signature is:
> > > >  KeyQuery<K, V> implements Query<V>
> > > > 
> > > > So, when you use that query, it does bind R to V, and the
> > > > result will be a QueryResult<V>.
> > > > 
> > > > 
> > > Cool thanks. My main confusion comes from the inconsistency of key-query
> > > and scan-query. The former implements Query as:
> > > 
> > > KeyQuery<K, V> implements Query<V>:  => binds V to R, and K unbound
> > > 
> > > Whereas the latter implements as:
> > > 
> > > ScanQuery<K, V> implements Query<KeyValueIterator<K, V>>: => binds
> > > KeyValueIterator<?, ?> to R, whereas K/V both unbound
> > > 
> > > 
> > > 
> > > > 
> > > > 12. I considered doing exactly that. The reason I shied away
> > > > from it in general is that if you're going to have a "raw"
> > > > query API, you also need to know the key serde before you do
> > > > a query (otherwise you can't query at all!). So, bundling a
> > > > serde with the response only really applies to the value.
> > > > 
> > > > 
> > > See the other comment above: my thinking is actually that, for Query we
> > > would, potentially always, prefer to have it as in deserialized object
> > > format (except for partial match, which we can discuss separately), we
> > only
> > > need to consider whether the QueryResult should be in raw or in
> > > deserialized format.
> > > 
> > > 
> > > > It still might be a good idea, but since I was thinking I
> > > > already needed a separate discovery method for the key
> > > > serde, then I might as well just keep the key and value
> > > > serdes together, rather than bundling the value serde with
> > > > each value.
> > > > 
> > > > I do think it would be neat to have queries that don't
> > > > deserialize the value by default and give you the option to
> > > > do it on demand, or maybe just de-structure some parts of
> > > > the value out (eg just reading the timestamp without
> > > > deserializing the rest of the value). But, now that I've
> > > > started to think about dropping the "raw" query design from
> > > > the scope of this KIP, I'm wondering if we can just consider
> > > > this use case later. It does seem plausible that we could
> > > > choose to bundle the serdes with the values for those
> > > > queries without needing a change in this KIP's framework, at
> > > > least.
> > > > 
> > > > 
> > > > Whew! Thanks again for the great thoughts. I'll make the
> > > > changes I mentioned tomorrow. Please let me know if you
> > > > disagree with any of my responses!
> > > > 
> > > > Thanks,
> > > > -John
> > > > 
> > > > On Mon, 2021-11-15 at 17:29 -0800, Guozhang Wang wrote:
> > > > > Hello John,
> > > > > 
> > > > > Great, great, great writeup! :) And thank you for bringing this up
> > > > finally.
> > > > > I have made a pass on the KIP as well as the POC PR of it, here are
> > some
> > > > > initial thoughts:
> > > > > 
> > > > > First are some meta ones:
> > > > > 
> > > > > 1. Today the serdes do not only happen at the metered-store layer,
> > > > > unfortunately. For windowed / sessioned stores, and also some newly
> > added
> > > > > ones for stream-stream joins that are optimized for time-based range
> > > > > queries, for example, the serdes are actually composite at multiple
> > > > layers.
> > > > > And the queries on the outer interface are also translated with serde
> > > > > wrapped / stripped along the way in layers. To be more specific,
> > today
> > > > our
> > > > > store hierarchy is like this:
> > > > > 
> > > > > metered * -> cached -> logged * -> formatted * (e.g. segmenged,
> > > > > list-valued) -> inner (rocksdb, in-memory)
> > > > > 
> > > > > and serdes today could happen on the layers with * above, where each
> > > > layer
> > > > > is stuffing a bit more as prefix/suffix into the query bytes. This
> > is not
> > > > > really by design or ideal, but a result of history accumulated tech
> > > > debts..
> > > > > There's a related JIRA ticket for it:
> > > > > https://issues.apache.org/jira/browse/KAFKA-13286. I guess my point
> > is
> > > > that
> > > > > we need to be a bit careful regarding how to implement the
> > > > > `KafkaStreams#serdesForStore(storeName)`, as we may expect some bumpy
> > > > roads
> > > > > moving forward.
> > > > > 
> > > > > 2. Related to 1 above, I think we cannot always delegate the
> > `query()`
> > > > > implementation to the `inner` store layer, since some serde, or even
> > some
> > > > > computation logic happens at the outer, especially the `formatted`
> > layer.
> > > > > For example, besides the cached layer, the `formatted` layer also
> > needs
> > > > to
> > > > > make sure the `query` object is being appropriately translated
> > > > beforeMaterialized
> > > > > handing it off downstreams to the inner store, and also need to
> > translate
> > > > > the `queryResult` a bit while handing it upwards in the hierarchy.
> > > > > 
> > > > > 3. As we add more query types in the IQv2, the inner store's `query`
> > > > > instantiation may be getting very clumsy with a large "switch"
> > condition
> > > > on
> > > > > all the possible query types. Although custom stores could consider
> > only
> > > > > supporting a few, having the `default` case to ignore all others,
> > > > built-in
> > > > > stores may still need to exhaust all possible types. I'm wondering if
> > > > it's
> > > > > a good trade-off to make `Query` be more restricted on extensibility
> > to
> > > > > have less exploding query type space, e.g. if a Query can only be
> > > > extended
> > > > > with some predefined dimensions like:
> > > > > 
> > > > > * query-field: key, non-key (some field extractor from the value
> > bytes
> > > > need
> > > > > to be provided)
> > > > > * query-scope: single, range
> > > > > * query-match-type (only be useful for a range scope): prefix-match
> > (e.g.
> > > > > for a range key query, the provided is only a prefix, and all keys
> > > > > containing this prefix should be returned), full-match
> > > > > * query-value-type: object, raw-bytes
> > > > > 
> > > > > 4. What's the expected usage for the execution info? Is it only for
> > > > logging
> > > > > purposes? If yes then I think not enforcing any string format is
> > fine,
> > > > that
> > > > > the store layers can just attach any string information that they
> > feel
> > > > > useful.
> > > > > 
> > > > > 5. I do not find any specific proposals for exception handling, what
> > > > would
> > > > > that look like? E.g. besides all the expected error cases like
> > > > non-active,
> > > > > how would we communicate other unexpected error cases such as store
> > > > closed,
> > > > > IO error, bad query parameters, etc?
> > > > > 
> > > > > 6. Since we do not deprecate any existing APIs in this KIP, it's a
> > bit
> > > > hard
> > > > > for readers to understand what is eventually going to be covered by
> > IQv2.
> > > > > For example, we know that eventually `KafkaStreams#store` would be
> > gone,
> > > > > but what about `KafkaStreams#queryMetadataForKey`, and
> > > > > `#streamsMetadataForStore`, and also `allLocalStorePartitionLags`? I
> > > > think
> > > > > it would be great to mention the end world state with IQv2 even if
> > the
> > > > KIP
> > > > > itself would not deprecate anything yet.
> > > > > 
> > > > > 7. It seems people are still a bit confused about the
> > > > > "Position/PositionBound" topics, and personally I think it's okay to
> > > > > exclude them in this KIP just to keep its (already large) scope
> > smaller.
> > > > > Also after we started implementing the KIP in full, we may have
> > learned
> > > > new
> > > > > things while fighting the details in the weeds, and that would be a
> > > > better
> > > > > timing for us to consider new parameters such as bounds, but also
> > caching
> > > > > bypassing, and other potential features as well.
> > > > > 
> > > > > Some minor ones:
> > > > > 
> > > > > 8. What about just naming the new classes as
> > `StateQueryRequest/Result`,
> > > > or
> > > > > `StoreQueryRequest/Result`? The word "interactive" is for describing
> > its
> > > > > semantics in docs, but I feel for class names we can use a more
> > > > meaningful
> > > > > prefix.
> > > > > 
> > > > > 9. Should the RawKeyQuery be extending `KeyQuery<byte[]>`, or
> > directly
> > > > > implementing `Query<byte[]`>?
> > > > > 
> > > > > 10. Why do we need the new class "InteractiveQuerySerdes" along with
> > > > > existing classes? In your PR it seems just using `StateSerdes`
> > directly.
> > > > > 
> > > > > 11. Why do we have a new template type "R" in the QueryResult class
> > in
> > > > > addition to "<K, V>"? Should R always be equal to V?
> > > > > 
> > > > > 12. Related to 10/11 above, what about letting the QueryResult to
> > always
> > > > be
> > > > > returning values in raw bytes, along with the serdes? And then it's
> > up to
> > > > > the callers whether they want the bytes to be immediately
> > deserialized or
> > > > > want them to be written somewhere and deserialized later? More
> > > > specifically
> > > > > we would only have a single function as KafkaStreams#query, and the
> > > > > QueryResult would be:
> > > > > 
> > > > > InteractiveQueryResult {
> > > > >   public InteractiveQueryResult(Map<Integer /*partition*/,
> > > > > QueryResult<byte[]>> partitionResults);
> > > > > 
> > > > > ...
> > > > > 
> > > > >   public StateSerdes<K, V> serdes();
> > > > > }
> > > > > 
> > > > > And then the result itself can also provide some built-in functions
> > to do
> > > > > the deser upon returning results, so that user's code would not get
> > more
> > > > > complicated. The benefit is that we end up with a single function in
> > > > > `KafkaStreams`, and the inner store always only need to implement
> > the raw
> > > > > query types. Of course doing this would not be so easy given the fact
> > > > > described in 1) above, but I feel this would be a good way to first
> > > > > abstract away this tech debt, and then later resolve it to a single
> > > > place.
> > > > > 
> > > > > ---------------
> > > > > 
> > > > > Again, congrats on the very nice proposal! Let me know what you think
> > > > about
> > > > > my comments.
> > > > > 
> > > > > Guozhang
> > > > > 
> > > > > 
> > > > > On Mon, Nov 15, 2021 at 2:52 PM John Roesler <vv...@apache.org>
> > > > wrote:
> > > > > 
> > > > > > Hi Patrick and Sagar,
> > > > > > 
> > > > > > Thanks for the feedback! I'll just break out the questions
> > > > > > and address them one at a time.
> > > > > > 
> > > > > > Patrick 1.
> > > > > > The default bound that I'm proposing is only to let active
> > > > > > tasks answer queries (which is also the default with IQ
> > > > > > today). Therefore, calling getPositionBound() would return a
> > > > > > PositionBound for which isLatest() is true.
> > > > > > 
> > > > > > Patrick 2.
> > > > > > I might have missed something in revision, but I'm not sure
> > > > > > what you're referring to exactly when you say they are
> > > > > > different. The IQRequest only has a PositionBound, and the
> > > > > > IQResponse only has a (concrete) Position, so I think they
> > > > > > are named accordingly (getPositionBound and getPosition). Am
> > > > > > I overlooking what you are talking about?
> > > > > > 
> > > > > > Sagar 1.
> > > > > > I think you're talking about the KeyValueStore#get(key)
> > > > > > method? This is a really good question. I went ahead and
> > > > > > dropped in an addendum to the KeyQuery example to show how
> > > > > > you would run the query in today's API. Here's a caracature
> > > > > > of the two APIS:
> > > > > > 
> > > > > > current:
> > > > > >   KeyValueStore store = kafkaStreams.store(
> > > > > >     "mystore",
> > > > > >     keyValueStore())
> > > > > >   int value = store.get(key);
> > > > > > 
> > > > > > proposed:
> > > > > >   int value = kafkaStreams.query(
> > > > > >     "mystore",
> > > > > >     KeyQuery.withKey(key));
> > > > > > 
> > > > > > So, today we first get the store interface and then we
> > > > > > invoke the method, and under the proposal, we would instead
> > > > > > just ask KafkaStreams to execute the query on the store. In
> > > > > > addition to all the other stuff I said in the motivation,
> > > > > > one thing I think is neat about this API is that it means we
> > > > > > can re-use queries across stores. So, for example, we could
> > > > > > also use KeyQuery on WindowStores, even though there's no
> > > > > > common interface between WindowStore and KeyValueStore.
> > > > > > 
> > > > > > In other words, stores can support any queries that make
> > > > > > sense and _not_ support any queries that don't make sense.
> > > > > > This gets into your second question...
> > > > > > 
> > > > > > Sagar 2.
> > > > > > Very good question. Your experience with your KIP-614
> > > > > > contribution was one of the things that made me want to
> > > > > > revise IQ to begin with. It seems like there's a really
> > > > > > stark gap between how straightforward the proposal is to add
> > > > > > a new store operation, and then how hard it is to actually
> > > > > > implement a new operation, due to all those intervening
> > > > > > wrappers.
> > > > > > 
> > > > > > There are two categories of wrappers to worry about:
> > > > > > - Facades: These only exist to disallow access to write
> > > > > > APIs, which are exposed through IQ today but shouldn't be
> > > > > > called. These are simply unnecessary under IQv2, since we
> > > > > > only run queries instead of returning the whole store.
> > > > > > - Store Layers: This is what you provided examples of. We
> > > > > > have store layers that let us compose features like
> > > > > > de/serialization and metering, changelogging, caching, etc.
> > > > > > A nice thing about this design is that we mostly don't have
> > > > > > to worry at all about those wrapper layers at all. Each of
> > > > > > these stores would simply delegate any query to lower layers
> > > > > > unless there is something they need to do. In my POC, I
> > > > > > simply added a delegating implementation to
> > > > > > WrappedStateStore, which meant that I didn't need to touch
> > > > > > most of the wrappers when I added a new query.
> > > > > > 
> > > > > > Here's what I think future contributors will have to worry
> > > > > > about:
> > > > > > 1. The basic query execution in the base byte stores
> > > > > > (RocksDB and InMemory)
> > > > > > 2. The Caching stores IF they want the query to be served
> > > > > > from the cache
> > > > > > 3. The Metered stores IF some serialization needs to be done
> > > > > > for the query
> > > > > > 
> > > > > > And that's it! We should be able to add new queries without
> > > > > > touching any other store layer besides those, and each one
> > > > > > of those is involved because it has some specific reason to
> > > > > > be.
> > > > > > 
> > > > > > 
> > > > > > Thanks again, Patrick and Sagar! Please let me know if I
> > > > > > failed to address your questions, or if you have any more.
> > > > > > 
> > > > > > Thanks,
> > > > > > -John
> > > > > > 
> > > > > > On Mon, 2021-11-15 at 22:37 +0530, Sagar wrote:
> > > > > > > Hi John,
> > > > > > > 
> > > > > > > Thanks for the great writeup! Couple of things I wanted to bring
> > > > up(may
> > > > > > or
> > > > > > > mayn't be relevant):
> > > > > > > 
> > > > > > > 1) The sample implementation that you have presented for
> > KeyQuery is
> > > > very
> > > > > > > helpful. One thing which may be added to it is how it connects
> > to the
> > > > > > > KeyValue.get(key) method. That's something that atleast I
> > couldn't
> > > > > > totally
> > > > > > > figure out-not sure about others though. I understand that it is
> > out
> > > > of
> > > > > > > scope of th KIP to explain for every query that IQ supports but
> > one
> > > > > > > implementation just to get a sense of how the changes would feel
> > > > like.
> > > > > > > 2) The other thing that I wanted to know is that StateStore on
> > it's
> > > > own
> > > > > > has
> > > > > > > a lot of implementations and some of them are wrappers, So at
> > what
> > > > levels
> > > > > > > do users need to implement the query methods? Like a
> > > > MeteredKeyValueStore
> > > > > > > wraps RocksDbStore and calls it internally through a wrapped
> > call.
> > > > As per
> > > > > > > the new changes, how would the scheme of things look like for the
> > > > same
> > > > > > > KeyQuery?
> > > > > > > 
> > > > > > > Thanks!
> > > > > > > Sagar.
> > > > > > > 
> > > > > > > 
> > > > > > > On Mon, Nov 15, 2021 at 6:20 PM Patrick Stuedi
> > > > > > <ps...@confluent.io.invalid>
> > > > > > > wrote:
> > > > > > > 
> > > > > > > > Hi John,
> > > > > > > > 
> > > > > > > > Thanks for submitting the KIP! One question I have is,
> > assuming one
> > > > > > > > instantiates InteractiveQueryRequest via withQuery, and then
> > later
> > > > > > calls
> > > > > > > > getPositionBound, what will the result be? Also I noticed the
> > > > Position
> > > > > > > > returning method is in InteractiveQueryRequest and
> > > > > > InteractiveQueryResult
> > > > > > > > is named differently, any particular reason?
> > > > > > > > 
> > > > > > > > Best,
> > > > > > > >   Patrick
> > > > > > > > 
> > > > > > > > 
> > > > > > > > On Fri, Nov 12, 2021 at 12:29 AM John Roesler <
> > vvcephei@apache.org
> > > > > 
> > > > > > wrote:
> > > > > > > > 
> > > > > > > > > Thanks for taking a look, Sophie!
> > > > > > > > > 
> > > > > > > > > Ah, that was a revision error. I had initially been planning
> > > > > > > > > an Optional<Set<Integer>> with Optional.empty() meaning to
> > > > > > > > > fetch all partitions, but then decided it was needlessly
> > > > > > > > > complex and changed it to the current proposal with two
> > > > > > > > > methods:
> > > > > > > > > 
> > > > > > > > > boolean isAllPartitions();
> > > > > > > > > Set<Integer> getPartitions(); (which would throw an
> > > > > > > > > exception if it's an "all partitions" request).
> > > > > > > > > 
> > > > > > > > > I've corrected the javadoc and also documented the
> > > > > > > > > exception.
> > > > > > > > > 
> > > > > > > > > Thanks!
> > > > > > > > > -John
> > > > > > > > > 
> > > > > > > > > On Thu, 2021-11-11 at 15:03 -0800, Sophie Blee-Goldman
> > > > > > > > > wrote:
> > > > > > > > > > Thanks John, I've been looking forward to this for a while
> > > > now. It
> > > > > > > > > > was pretty horrifying to learn
> > > > > > > > > > how present-day IQ works  (or rather, doesn't work) with
> > custom
> > > > > > state
> > > > > > > > > > stores :/
> > > > > > > > > > 
> > > > > > > > > > One minor cosmetic point, In the InteractiveQueryRequest
> > class,
> > > > > > the #
> > > > > > > > > > getPartitions
> > > > > > > > > > method has a return type of Set<Integer>, but the javadocs
> > > > refer to
> > > > > > > > > Optional.
> > > > > > > > > > Not
> > > > > > > > > > sure which is intended for this API, but if is supposed to
> > be
> > > > the
> > > > > > > > return
> > > > > > > > > > type, do you perhaps
> > > > > > > > > > mean for it to be  Optional.ofEmpty() and
> > Optional.of(non-empty
> > > > > > set)
> > > > > > > > > > rather than Optional.of(empty set) and
> > Optional.of(non-empty
> > > > set) ?
> > > > > > > > > > 
> > > > > > > > > > On Thu, Nov 11, 2021 at 12:03 PM John Roesler <
> > > > vvcephei@apache.org
> > > > > > > 
> > > > > > > > > wrote:
> > > > > > > > > > 
> > > > > > > > > > > Hello again, all,
> > > > > > > > > > > 
> > > > > > > > > > > Just bumping this discussion on a new, more flexible
> > > > > > > > > > > Interactive Query API in Kafka Streams.
> > > > > > > > > > > 
> > > > > > > > > > > If there are no concerns, I'll go ahead and call a vote
> > on
> > > > > > > > > > > Monday.
> > > > > > > > > > > 
> > > > > > > > > > > Thanks!
> > > > > > > > > > > -John
> > > > > > > > > > > 
> > > > > > > > > > > On Tue, 2021-11-09 at 17:37 -0600, John Roesler wrote:
> > > > > > > > > > > > Hello all,
> > > > > > > > > > > > 
> > > > > > > > > > > > I'd like to start the discussion for KIP-796, which
> > > > proposes
> > > > > > > > > > > > a revamp of the Interactive Query APIs in Kafka
> > Streams.
> > > > > > > > > > > > 
> > > > > > > > > > > > The proposal is here:
> > > > > > > > > > > > https://cwiki.apache.org/confluence/x/34xnCw
> > > > > > > > > > > > 
> > > > > > > > > > > > I look forward to your feedback!
> > > > > > > > > > > > 
> > > > > > > > > > > > Thank you,
> > > > > > > > > > > > -John
> > > > > > > > > > > > 
> > > > > > > > > > > 
> > > > > > > > > > > 
> > > > > > > > > > > 
> > > > > > > > > 
> > > > > > > > > 
> > > > > > > > > 
> > > > > > > > 
> > > > > > 
> > > > > > 
> > > > > 
> > > > 
> > > > 
> > > 
> > 
> > 
> 


Re: [DISCUSS] KIP-796: Interactive Query v2

Posted by Guozhang Wang <wa...@gmail.com>.
Thanks John.

I made another pass on the KIP and overall it already looks pretty good. I
just have a couple more minor comments:

13: What do you think about just removing the following function in
QueryResult

  // returns a failed query result because caller requested a "latest"
bound, but the task was
  // not active and running.
  public static <R> QueryResult<R> notActive(String currentState);

Instead just use `notUpToBound` for the case when `latest` bound is
requested but the node is not the active replica. My main motivation is
trying to abstract away the notion of active/standby from the public APIs
itself, and hence capturing both this case as well as just a
normal "position bound not achieved" in the same return signal, until later
when we think it is indeed needed to separate them with different returns.

14: Regarding the possible exceptions being thrown from `query`, it seems
more exception types are possible from KIP-216:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-216%3A+IQ+should+throw+different+exceptions+for+different+errors,
should we include all in the javadocs?


Guozhang



On Wed, Nov 17, 2021 at 3:25 PM John Roesler <vv...@apache.org> wrote:

> Thanks for the reply, Guozhang!
>
> I have updated the KIP to tie up the remaining points that
> we have discussed. I really appreciate your time in refining
> the proposal. I included a quick summary of the final state
> of our discussion points below.
>
> Since it seems like this discussion thread is pretty
> convergent, I'll go ahead and start the voting thread soon.
>
> Thanks again!
> -John
>
> P.S.: the final state of our discussion points:
>
> 1. I removed serdesForStore from the proposal (and moved it
> to Rejected Alternatives)
>
> 2. Thanks for that reference. I had overlooked that
> implementation. I'd note that the ListValuesStore is
> currently only used in the KStream API, which doesn't
> support queries at all. Due to its interface, it could
> theoretically be used to materialize a KTable, though it has
> no supplier provided in the typical Stores factory class.
>
> Regardless, I think that it would still be a similar story
> to the Segmented store. The ListValues store would simply
> choose to terminate the query on its own and not delegate to
> any of the wrapped KeyValue stores. It wouldn't matter that
> the wrapped stores have a query-handling facility of their
> own, if the wrapping store doesn't choose to delegate, the
> wrapped store will not try to execute any queries.
>
> Specifically regarding the key transformation that these
> "formatted" stores perform, when they handle the query, they
> would have the ability to execute the query in any way that
> makes sense OR to just reject the query if it doesn't make
> sense.
>
> 3, 4: nothing to do
>
> 5: I updated the KIP to specify the exceptions that may be
> thrown in `KafkaStreams#query` and to clarify that per-
> partition failures will be reported as per-partition failed
> QueryResult objects instead of thrown exceptions. That
> allows us to successfully serve some partitions of the
> request even if others fail.
>
> 6: I added a note that updating the metadata APIs is left
> for future work.
>
> 7: nothing to do
>
> 8. I went with StateQueryRequest and StateQueryResponse.
>
> 9, 10: nothing to do.
>
> 11: Ah, I see. That's a good point, but it's not fundamental
> to the framework. I think we can tackle it when we propose
> the actual queries.
>
> 12: Cool. I went ahead and dropped the "serdesForStore"
> method. I think you're onto something there, and we should
> tackle it separately when we propose the actual queries.
>
>
>
>
> On Tue, 2021-11-16 at 15:59 -0800, Guozhang Wang wrote:
> > Thanks John! Some more thoughts inlined below.
> >
> > On Mon, Nov 15, 2021 at 10:07 PM John Roesler <vv...@apache.org>
> wrote:
> >
> > > Thanks for the review, Guozhang!
> > >
> > > 1. This is a great point. I fell into the age-old trap of
> > > only considering the simplest store type and forgot about
> > > this extra wrinkle of the "key schema" that we use in
> > > Windowed and Session stores.
> > >
> > > Depending on how we want to forge forward with our provided
> > > queries, I think it can still work out ok. The simplest
> > > solution is just to have windowed versions of our queries
> > > for use on the windowed stores. That should work naively
> > > because we're basically just preserving the existing
> > > interactions. It might not be ideal in the long run, but at
> > > least it lets us make IQv2 orthogonal from other efforts to
> > > simplify the stores themselves.
> > >
> > > If we do that, then it would actually be correct to go ahead
> > > and just return the serdes that are present in the Metered
> > > stores today. For example, if I have a Windowed store with
> > > Integer keys, then the key serde I get from serdesForStore
> > > would just be the IntegerSerde. The query I'd use the
> > > serialized key with would be a RawWindowedKeyQuery, which
> > > takes a byte[] key and a timestamp. Then, the low-level
> > > store (the segmented store in this case) would have to take
> > > the next step to use its schema before making that last-mile
> > > query. Note, this is precisely how fetch is implemented
> > > today in RocksDBWindowStore:
> > >
> > > public byte[] fetch(final Bytes key, final long timestamp) {
> > >   return wrapped().get(WindowKeySchema.toStoreKeyBinary(key,
> > > timestamp, seqnum));
> > > }
> > >
> > > In other words, if we set up our provided Query types to
> > > stick close to the current store query methods, then
> > > everything "should work out" (tm).
> > >
> > > I think where things start to get more complicated would be
> > > if we wanted to expose the actual, raw, on-disk binary key
> > > to the user, along with a serde that can interpret it. Then,
> > > we would have to pack up the serde and the schema. If we go
> > > down that road, then knowing which one (the key serde or the
> > > windowed schema + the key serde) the user wants when they
> > > ask for "the serde" would be a challenge.
> > >
> > > I'm actually thinking maybe we don't need to include the
> > > serdesForStore method in this proposal, but instead leave it
> > > for follow-on work (if desired) to add it along with raw
> > > queries, since it's really only needed if you want raw
> > > queries and (as you mentioned later) there may be better
> > > ways to present the serdes, which is always easier to figure
> > > out once there's a use case.
> > >
> > >
> > > 2. Hmm, if I understand what you mean by the "formatted"
> > > layer, is that the one supplied by the
> > > WindowedBytesStoreSupplier or SessionBytesStoreSupplier in
> > > Materialized? I think the basic idea of this proposal is to
> > > let whatever store gets supplied there be the "last stop"
> > > for the query.
> > >
> > > For the case of our default windowed store, this is the
> > > segmented RocksDB store. It's true that this store "wraps" a
> > > bunch of segments, but it would be the segmented store's
> > > responsibility to handle the query, not defer to the
> > > segments. This might mean different things for different
> > > queries, but naively, I think it can just invoke to the
> > > current implementation of each of its methods.
> > >
> > > There might be future queries that require more
> > > sophisticated responses, but we should be able to add new
> > > queries for those, which have no restrictions on their
> > > response types. For example, we could choose to respond to a
> > > scan with a list of iterators, one for each segment.
> > >
> > >
> > For `formatted` stores, I also mean the ListValueStore which was added
> > recently for stream-stream joins, for example. The interface is a
> KV-store
> > but that disables same-key overwrites but instead keep all the values of
> > the same key as a list, and users can only delete old values by deleting
> > the whole key-list (which would of course delete new values as well).
> > ListValueStore uses a KeyValueStore as its inner, but would convert the
> put
> > call as append.
> >
> > I think in the long run, we should have a different interface other than
> > KVStore for this type, and the implementation would then be at the
> > `formatted` store layer. That means the `query` should be always
> > implemented at the inner layer of the logged store (that could be the
> most
> > `inner` store, or the `fomatted` store), and upper level wrapped stores
> > would be delegating to the inner stores.
> >
> > As for serdes, here's some more second thoughts: generally speaking, it's
> > always convenient for users to pass in the value as object than raw
> bytes,
> > the only exception is if the query is not for exact matching but prefix
> (or
> > suffix, though we do not have such cases today) matching, in which case
> we
> > would need the raw bytes in order to pass in the prefixed bytes into the
> > inner store. The returned value though could either be preferred as raw
> > bytes, or be deserialized already.
> >
> > The composite-serde mostly happens at the key, but not much at the value
> > (we only have "value-timestamp" type which needs a composite
> > deserialization, all others are direct values). So I'm feeling that a
> Query
> > would be best represented with non-serialized parameter (i.e.
> `KeyQuery<K,
> > V>`), while the query result be optionally raw or deserialized with the
> > serde class.
> >
> >
> > >
> > > 3. I agree the large switch (or if/else) (or Map) for query
> > > dispatch is a concern. That's the thing I'm most worried
> > > will become cumbersome. I think your idea is neat, though,
> > > because a lot of our surface area is providing a bunch of
> > > those different combinations of query attributes. I think if
> > > we get a little meta, we can actually fold it into the
> > > existing KIP.
> > >
> > > Rather than making Query any more restrictive, what we could
> > > do is to choose to follow your idea for the provided queries
> > > we ship with Streams. Although I had been thinking we would
> > > ship a KeyQuery, RangeQuery, etc., we could absolutely
> > > compactify those queries as much as possible so that there
> > > are only a few queries with those dimensions you listed.
> > >
> > > That way we can avoid blowing up the query space with our
> > > own provided queries, but we can still keep the framework as
> > > general as possible.
> > >
> > >
> > Sounds good!
> >
> >
> > > 4. I'm not sure, actually! I just thought it would be neat
> > > to have. I know I've spent my fair share of adding println
> > > statements to Streams or stepping though the debugger when
> > > something like that proposal would have done the job.
> > >
> > > So, I guess the answer is yes, I was just thinking of it as
> > > a debugging/informational tool. I also think that if we want
> > > to make it more structured in the future, we should be able
> > > to evolve that part of the API without and major problems.
> > >
> > >
> > > 5. That's another great point, and it's a miss on my part.
> > > The short answer is that we'd simply throw whatever runtime
> > > exceptions are appropriate, but I should and will document
> > > what they will be.
> > >
> > >
> > > 6. I do think those APIs need some attention, but I was
> > > actually hoping to treat that as a separate scope for design
> > > work later. I think that there shouldn't be any downside to
> > > tackling them as orthogonal, but I agree people will wonder
> > > about the relationship there, so I can update the KIP with
> > > some notes about it.
> > >
> > >
> > Thanks! I personally would consider that these APIs would eventually be
> > refactored as well as we stick with IQv2, and also the
> > `allLocalStorePartitionLags` would be deprecated with Position.
> >
> >
> > >
> > > 7. Yes, I've always been a bit on the fence about whether to
> > > bundle that in here. The only thing that made me keep it in
> > > is that we'd actually have to deprecate the newly proposed
> > > StateStore#query method if we want to add it in later. I.e.,
> > > we would just propose StateStore#query(query, executionInfo)
> > > right now, but then deprecate it and add
> > > StateStore#query(query, bound, executionInfo).
> > >
> > > Given that, it seems mildly better to just take the leap for
> > > now, and if it turns out we can't actually implement it
> > > nicely, then we can always drop it from the proposal after
> > > the fact.
> > >
> > > That said, if that aspect is going to derail this KIP's
> > > discussion, I think the lesser evil would indeed be to just
> > > drop it now. So far, it seems like there's been some small
> > > questions about it, but nothing that really takes us off
> > > course. So, if you don't object, I think I'd like to keep it
> > > in for a little while longer.
> > >
> > >
> > That's a fair point, let's keep it in this KIP then.
> >
> >
> > >
> > > 8. Sure, I like that idea. The names are a bit cumbersome.
> > >
> > > 9. I had them as separate types so that we could more easily
> > > inspect the query type. Otherwise, we'd just have to assume
> > > the generics' type is byte[] in the lower layer. I'm not
> > > sure that's the right call, but it also seems like the flip
> > > of a coin as to which is better.
> > >
> > > 10. The StateSerdes class that we have is internal. I used
> > > it in the POC to save time, but I gave it a different name
> > > in the KIP to make it clear that I'm proposing that we
> > > create a proper public interface and not just expose the
> > > internal one, which has a bunch of extra stuff in it.
> > >
> > > Then again, if I go ahead and drop the serdes from the
> > > propsoal entirely, we can worry about that another time!
> > >
> > >
> > > 11. I think I might have a typo somewhere, because I'm not
> > > following the question. The Query itself defines the result
> > > type <R>, QueryResult is just a container wrapping that R
> > > result as well as the execution info, etc. per partition.
> > >
> > > For a KeyQuery, its signature is:
> > >  KeyQuery<K, V> implements Query<V>
> > >
> > > So, when you use that query, it does bind R to V, and the
> > > result will be a QueryResult<V>.
> > >
> > >
> > Cool thanks. My main confusion comes from the inconsistency of key-query
> > and scan-query. The former implements Query as:
> >
> > KeyQuery<K, V> implements Query<V>:  => binds V to R, and K unbound
> >
> > Whereas the latter implements as:
> >
> > ScanQuery<K, V> implements Query<KeyValueIterator<K, V>>: => binds
> > KeyValueIterator<?, ?> to R, whereas K/V both unbound
> >
> >
> >
> > >
> > > 12. I considered doing exactly that. The reason I shied away
> > > from it in general is that if you're going to have a "raw"
> > > query API, you also need to know the key serde before you do
> > > a query (otherwise you can't query at all!). So, bundling a
> > > serde with the response only really applies to the value.
> > >
> > >
> > See the other comment above: my thinking is actually that, for Query we
> > would, potentially always, prefer to have it as in deserialized object
> > format (except for partial match, which we can discuss separately), we
> only
> > need to consider whether the QueryResult should be in raw or in
> > deserialized format.
> >
> >
> > > It still might be a good idea, but since I was thinking I
> > > already needed a separate discovery method for the key
> > > serde, then I might as well just keep the key and value
> > > serdes together, rather than bundling the value serde with
> > > each value.
> > >
> > > I do think it would be neat to have queries that don't
> > > deserialize the value by default and give you the option to
> > > do it on demand, or maybe just de-structure some parts of
> > > the value out (eg just reading the timestamp without
> > > deserializing the rest of the value). But, now that I've
> > > started to think about dropping the "raw" query design from
> > > the scope of this KIP, I'm wondering if we can just consider
> > > this use case later. It does seem plausible that we could
> > > choose to bundle the serdes with the values for those
> > > queries without needing a change in this KIP's framework, at
> > > least.
> > >
> > >
> > > Whew! Thanks again for the great thoughts. I'll make the
> > > changes I mentioned tomorrow. Please let me know if you
> > > disagree with any of my responses!
> > >
> > > Thanks,
> > > -John
> > >
> > > On Mon, 2021-11-15 at 17:29 -0800, Guozhang Wang wrote:
> > > > Hello John,
> > > >
> > > > Great, great, great writeup! :) And thank you for bringing this up
> > > finally.
> > > > I have made a pass on the KIP as well as the POC PR of it, here are
> some
> > > > initial thoughts:
> > > >
> > > > First are some meta ones:
> > > >
> > > > 1. Today the serdes do not only happen at the metered-store layer,
> > > > unfortunately. For windowed / sessioned stores, and also some newly
> added
> > > > ones for stream-stream joins that are optimized for time-based range
> > > > queries, for example, the serdes are actually composite at multiple
> > > layers.
> > > > And the queries on the outer interface are also translated with serde
> > > > wrapped / stripped along the way in layers. To be more specific,
> today
> > > our
> > > > store hierarchy is like this:
> > > >
> > > > metered * -> cached -> logged * -> formatted * (e.g. segmenged,
> > > > list-valued) -> inner (rocksdb, in-memory)
> > > >
> > > > and serdes today could happen on the layers with * above, where each
> > > layer
> > > > is stuffing a bit more as prefix/suffix into the query bytes. This
> is not
> > > > really by design or ideal, but a result of history accumulated tech
> > > debts..
> > > > There's a related JIRA ticket for it:
> > > > https://issues.apache.org/jira/browse/KAFKA-13286. I guess my point
> is
> > > that
> > > > we need to be a bit careful regarding how to implement the
> > > > `KafkaStreams#serdesForStore(storeName)`, as we may expect some bumpy
> > > roads
> > > > moving forward.
> > > >
> > > > 2. Related to 1 above, I think we cannot always delegate the
> `query()`
> > > > implementation to the `inner` store layer, since some serde, or even
> some
> > > > computation logic happens at the outer, especially the `formatted`
> layer.
> > > > For example, besides the cached layer, the `formatted` layer also
> needs
> > > to
> > > > make sure the `query` object is being appropriately translated
> > > beforeMaterialized
> > > > handing it off downstreams to the inner store, and also need to
> translate
> > > > the `queryResult` a bit while handing it upwards in the hierarchy.
> > > >
> > > > 3. As we add more query types in the IQv2, the inner store's `query`
> > > > instantiation may be getting very clumsy with a large "switch"
> condition
> > > on
> > > > all the possible query types. Although custom stores could consider
> only
> > > > supporting a few, having the `default` case to ignore all others,
> > > built-in
> > > > stores may still need to exhaust all possible types. I'm wondering if
> > > it's
> > > > a good trade-off to make `Query` be more restricted on extensibility
> to
> > > > have less exploding query type space, e.g. if a Query can only be
> > > extended
> > > > with some predefined dimensions like:
> > > >
> > > > * query-field: key, non-key (some field extractor from the value
> bytes
> > > need
> > > > to be provided)
> > > > * query-scope: single, range
> > > > * query-match-type (only be useful for a range scope): prefix-match
> (e.g.
> > > > for a range key query, the provided is only a prefix, and all keys
> > > > containing this prefix should be returned), full-match
> > > > * query-value-type: object, raw-bytes
> > > >
> > > > 4. What's the expected usage for the execution info? Is it only for
> > > logging
> > > > purposes? If yes then I think not enforcing any string format is
> fine,
> > > that
> > > > the store layers can just attach any string information that they
> feel
> > > > useful.
> > > >
> > > > 5. I do not find any specific proposals for exception handling, what
> > > would
> > > > that look like? E.g. besides all the expected error cases like
> > > non-active,
> > > > how would we communicate other unexpected error cases such as store
> > > closed,
> > > > IO error, bad query parameters, etc?
> > > >
> > > > 6. Since we do not deprecate any existing APIs in this KIP, it's a
> bit
> > > hard
> > > > for readers to understand what is eventually going to be covered by
> IQv2.
> > > > For example, we know that eventually `KafkaStreams#store` would be
> gone,
> > > > but what about `KafkaStreams#queryMetadataForKey`, and
> > > > `#streamsMetadataForStore`, and also `allLocalStorePartitionLags`? I
> > > think
> > > > it would be great to mention the end world state with IQv2 even if
> the
> > > KIP
> > > > itself would not deprecate anything yet.
> > > >
> > > > 7. It seems people are still a bit confused about the
> > > > "Position/PositionBound" topics, and personally I think it's okay to
> > > > exclude them in this KIP just to keep its (already large) scope
> smaller.
> > > > Also after we started implementing the KIP in full, we may have
> learned
> > > new
> > > > things while fighting the details in the weeds, and that would be a
> > > better
> > > > timing for us to consider new parameters such as bounds, but also
> caching
> > > > bypassing, and other potential features as well.
> > > >
> > > > Some minor ones:
> > > >
> > > > 8. What about just naming the new classes as
> `StateQueryRequest/Result`,
> > > or
> > > > `StoreQueryRequest/Result`? The word "interactive" is for describing
> its
> > > > semantics in docs, but I feel for class names we can use a more
> > > meaningful
> > > > prefix.
> > > >
> > > > 9. Should the RawKeyQuery be extending `KeyQuery<byte[]>`, or
> directly
> > > > implementing `Query<byte[]`>?
> > > >
> > > > 10. Why do we need the new class "InteractiveQuerySerdes" along with
> > > > existing classes? In your PR it seems just using `StateSerdes`
> directly.
> > > >
> > > > 11. Why do we have a new template type "R" in the QueryResult class
> in
> > > > addition to "<K, V>"? Should R always be equal to V?
> > > >
> > > > 12. Related to 10/11 above, what about letting the QueryResult to
> always
> > > be
> > > > returning values in raw bytes, along with the serdes? And then it's
> up to
> > > > the callers whether they want the bytes to be immediately
> deserialized or
> > > > want them to be written somewhere and deserialized later? More
> > > specifically
> > > > we would only have a single function as KafkaStreams#query, and the
> > > > QueryResult would be:
> > > >
> > > > InteractiveQueryResult {
> > > >   public InteractiveQueryResult(Map<Integer /*partition*/,
> > > > QueryResult<byte[]>> partitionResults);
> > > >
> > > > ...
> > > >
> > > >   public StateSerdes<K, V> serdes();
> > > > }
> > > >
> > > > And then the result itself can also provide some built-in functions
> to do
> > > > the deser upon returning results, so that user's code would not get
> more
> > > > complicated. The benefit is that we end up with a single function in
> > > > `KafkaStreams`, and the inner store always only need to implement
> the raw
> > > > query types. Of course doing this would not be so easy given the fact
> > > > described in 1) above, but I feel this would be a good way to first
> > > > abstract away this tech debt, and then later resolve it to a single
> > > place.
> > > >
> > > > ---------------
> > > >
> > > > Again, congrats on the very nice proposal! Let me know what you think
> > > about
> > > > my comments.
> > > >
> > > > Guozhang
> > > >
> > > >
> > > > On Mon, Nov 15, 2021 at 2:52 PM John Roesler <vv...@apache.org>
> > > wrote:
> > > >
> > > > > Hi Patrick and Sagar,
> > > > >
> > > > > Thanks for the feedback! I'll just break out the questions
> > > > > and address them one at a time.
> > > > >
> > > > > Patrick 1.
> > > > > The default bound that I'm proposing is only to let active
> > > > > tasks answer queries (which is also the default with IQ
> > > > > today). Therefore, calling getPositionBound() would return a
> > > > > PositionBound for which isLatest() is true.
> > > > >
> > > > > Patrick 2.
> > > > > I might have missed something in revision, but I'm not sure
> > > > > what you're referring to exactly when you say they are
> > > > > different. The IQRequest only has a PositionBound, and the
> > > > > IQResponse only has a (concrete) Position, so I think they
> > > > > are named accordingly (getPositionBound and getPosition). Am
> > > > > I overlooking what you are talking about?
> > > > >
> > > > > Sagar 1.
> > > > > I think you're talking about the KeyValueStore#get(key)
> > > > > method? This is a really good question. I went ahead and
> > > > > dropped in an addendum to the KeyQuery example to show how
> > > > > you would run the query in today's API. Here's a caracature
> > > > > of the two APIS:
> > > > >
> > > > > current:
> > > > >   KeyValueStore store = kafkaStreams.store(
> > > > >     "mystore",
> > > > >     keyValueStore())
> > > > >   int value = store.get(key);
> > > > >
> > > > > proposed:
> > > > >   int value = kafkaStreams.query(
> > > > >     "mystore",
> > > > >     KeyQuery.withKey(key));
> > > > >
> > > > > So, today we first get the store interface and then we
> > > > > invoke the method, and under the proposal, we would instead
> > > > > just ask KafkaStreams to execute the query on the store. In
> > > > > addition to all the other stuff I said in the motivation,
> > > > > one thing I think is neat about this API is that it means we
> > > > > can re-use queries across stores. So, for example, we could
> > > > > also use KeyQuery on WindowStores, even though there's no
> > > > > common interface between WindowStore and KeyValueStore.
> > > > >
> > > > > In other words, stores can support any queries that make
> > > > > sense and _not_ support any queries that don't make sense.
> > > > > This gets into your second question...
> > > > >
> > > > > Sagar 2.
> > > > > Very good question. Your experience with your KIP-614
> > > > > contribution was one of the things that made me want to
> > > > > revise IQ to begin with. It seems like there's a really
> > > > > stark gap between how straightforward the proposal is to add
> > > > > a new store operation, and then how hard it is to actually
> > > > > implement a new operation, due to all those intervening
> > > > > wrappers.
> > > > >
> > > > > There are two categories of wrappers to worry about:
> > > > > - Facades: These only exist to disallow access to write
> > > > > APIs, which are exposed through IQ today but shouldn't be
> > > > > called. These are simply unnecessary under IQv2, since we
> > > > > only run queries instead of returning the whole store.
> > > > > - Store Layers: This is what you provided examples of. We
> > > > > have store layers that let us compose features like
> > > > > de/serialization and metering, changelogging, caching, etc.
> > > > > A nice thing about this design is that we mostly don't have
> > > > > to worry at all about those wrapper layers at all. Each of
> > > > > these stores would simply delegate any query to lower layers
> > > > > unless there is something they need to do. In my POC, I
> > > > > simply added a delegating implementation to
> > > > > WrappedStateStore, which meant that I didn't need to touch
> > > > > most of the wrappers when I added a new query.
> > > > >
> > > > > Here's what I think future contributors will have to worry
> > > > > about:
> > > > > 1. The basic query execution in the base byte stores
> > > > > (RocksDB and InMemory)
> > > > > 2. The Caching stores IF they want the query to be served
> > > > > from the cache
> > > > > 3. The Metered stores IF some serialization needs to be done
> > > > > for the query
> > > > >
> > > > > And that's it! We should be able to add new queries without
> > > > > touching any other store layer besides those, and each one
> > > > > of those is involved because it has some specific reason to
> > > > > be.
> > > > >
> > > > >
> > > > > Thanks again, Patrick and Sagar! Please let me know if I
> > > > > failed to address your questions, or if you have any more.
> > > > >
> > > > > Thanks,
> > > > > -John
> > > > >
> > > > > On Mon, 2021-11-15 at 22:37 +0530, Sagar wrote:
> > > > > > Hi John,
> > > > > >
> > > > > > Thanks for the great writeup! Couple of things I wanted to bring
> > > up(may
> > > > > or
> > > > > > mayn't be relevant):
> > > > > >
> > > > > > 1) The sample implementation that you have presented for
> KeyQuery is
> > > very
> > > > > > helpful. One thing which may be added to it is how it connects
> to the
> > > > > > KeyValue.get(key) method. That's something that atleast I
> couldn't
> > > > > totally
> > > > > > figure out-not sure about others though. I understand that it is
> out
> > > of
> > > > > > scope of th KIP to explain for every query that IQ supports but
> one
> > > > > > implementation just to get a sense of how the changes would feel
> > > like.
> > > > > > 2) The other thing that I wanted to know is that StateStore on
> it's
> > > own
> > > > > has
> > > > > > a lot of implementations and some of them are wrappers, So at
> what
> > > levels
> > > > > > do users need to implement the query methods? Like a
> > > MeteredKeyValueStore
> > > > > > wraps RocksDbStore and calls it internally through a wrapped
> call.
> > > As per
> > > > > > the new changes, how would the scheme of things look like for the
> > > same
> > > > > > KeyQuery?
> > > > > >
> > > > > > Thanks!
> > > > > > Sagar.
> > > > > >
> > > > > >
> > > > > > On Mon, Nov 15, 2021 at 6:20 PM Patrick Stuedi
> > > > > <ps...@confluent.io.invalid>
> > > > > > wrote:
> > > > > >
> > > > > > > Hi John,
> > > > > > >
> > > > > > > Thanks for submitting the KIP! One question I have is,
> assuming one
> > > > > > > instantiates InteractiveQueryRequest via withQuery, and then
> later
> > > > > calls
> > > > > > > getPositionBound, what will the result be? Also I noticed the
> > > Position
> > > > > > > returning method is in InteractiveQueryRequest and
> > > > > InteractiveQueryResult
> > > > > > > is named differently, any particular reason?
> > > > > > >
> > > > > > > Best,
> > > > > > >   Patrick
> > > > > > >
> > > > > > >
> > > > > > > On Fri, Nov 12, 2021 at 12:29 AM John Roesler <
> vvcephei@apache.org
> > > >
> > > > > wrote:
> > > > > > >
> > > > > > > > Thanks for taking a look, Sophie!
> > > > > > > >
> > > > > > > > Ah, that was a revision error. I had initially been planning
> > > > > > > > an Optional<Set<Integer>> with Optional.empty() meaning to
> > > > > > > > fetch all partitions, but then decided it was needlessly
> > > > > > > > complex and changed it to the current proposal with two
> > > > > > > > methods:
> > > > > > > >
> > > > > > > > boolean isAllPartitions();
> > > > > > > > Set<Integer> getPartitions(); (which would throw an
> > > > > > > > exception if it's an "all partitions" request).
> > > > > > > >
> > > > > > > > I've corrected the javadoc and also documented the
> > > > > > > > exception.
> > > > > > > >
> > > > > > > > Thanks!
> > > > > > > > -John
> > > > > > > >
> > > > > > > > On Thu, 2021-11-11 at 15:03 -0800, Sophie Blee-Goldman
> > > > > > > > wrote:
> > > > > > > > > Thanks John, I've been looking forward to this for a while
> > > now. It
> > > > > > > > > was pretty horrifying to learn
> > > > > > > > > how present-day IQ works  (or rather, doesn't work) with
> custom
> > > > > state
> > > > > > > > > stores :/
> > > > > > > > >
> > > > > > > > > One minor cosmetic point, In the InteractiveQueryRequest
> class,
> > > > > the #
> > > > > > > > > getPartitions
> > > > > > > > > method has a return type of Set<Integer>, but the javadocs
> > > refer to
> > > > > > > > Optional.
> > > > > > > > > Not
> > > > > > > > > sure which is intended for this API, but if is supposed to
> be
> > > the
> > > > > > > return
> > > > > > > > > type, do you perhaps
> > > > > > > > > mean for it to be  Optional.ofEmpty() and
> Optional.of(non-empty
> > > > > set)
> > > > > > > > > rather than Optional.of(empty set) and
> Optional.of(non-empty
> > > set) ?
> > > > > > > > >
> > > > > > > > > On Thu, Nov 11, 2021 at 12:03 PM John Roesler <
> > > vvcephei@apache.org
> > > > > >
> > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hello again, all,
> > > > > > > > > >
> > > > > > > > > > Just bumping this discussion on a new, more flexible
> > > > > > > > > > Interactive Query API in Kafka Streams.
> > > > > > > > > >
> > > > > > > > > > If there are no concerns, I'll go ahead and call a vote
> on
> > > > > > > > > > Monday.
> > > > > > > > > >
> > > > > > > > > > Thanks!
> > > > > > > > > > -John
> > > > > > > > > >
> > > > > > > > > > On Tue, 2021-11-09 at 17:37 -0600, John Roesler wrote:
> > > > > > > > > > > Hello all,
> > > > > > > > > > >
> > > > > > > > > > > I'd like to start the discussion for KIP-796, which
> > > proposes
> > > > > > > > > > > a revamp of the Interactive Query APIs in Kafka
> Streams.
> > > > > > > > > > >
> > > > > > > > > > > The proposal is here:
> > > > > > > > > > > https://cwiki.apache.org/confluence/x/34xnCw
> > > > > > > > > > >
> > > > > > > > > > > I look forward to your feedback!
> > > > > > > > > > >
> > > > > > > > > > > Thank you,
> > > > > > > > > > > -John
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > >
> > > > >
> > > >
> > >
> > >
> >
>
>

-- 
-- Guozhang

Re: [DISCUSS] KIP-796: Interactive Query v2

Posted by John Roesler <vv...@apache.org>.
Thanks for the reply, Guozhang!

I have updated the KIP to tie up the remaining points that
we have discussed. I really appreciate your time in refining
the proposal. I included a quick summary of the final state
of our discussion points below.

Since it seems like this discussion thread is pretty
convergent, I'll go ahead and start the voting thread soon.

Thanks again!
-John

P.S.: the final state of our discussion points:

1. I removed serdesForStore from the proposal (and moved it
to Rejected Alternatives)

2. Thanks for that reference. I had overlooked that
implementation. I'd note that the ListValuesStore is
currently only used in the KStream API, which doesn't
support queries at all. Due to its interface, it could
theoretically be used to materialize a KTable, though it has
no supplier provided in the typical Stores factory class.

Regardless, I think that it would still be a similar story
to the Segmented store. The ListValues store would simply
choose to terminate the query on its own and not delegate to
any of the wrapped KeyValue stores. It wouldn't matter that
the wrapped stores have a query-handling facility of their
own, if the wrapping store doesn't choose to delegate, the
wrapped store will not try to execute any queries.

Specifically regarding the key transformation that these
"formatted" stores perform, when they handle the query, they
would have the ability to execute the query in any way that
makes sense OR to just reject the query if it doesn't make
sense.

3, 4: nothing to do

5: I updated the KIP to specify the exceptions that may be
thrown in `KafkaStreams#query` and to clarify that per-
partition failures will be reported as per-partition failed
QueryResult objects instead of thrown exceptions. That
allows us to successfully serve some partitions of the
request even if others fail.

6: I added a note that updating the metadata APIs is left
for future work.

7: nothing to do

8. I went with StateQueryRequest and StateQueryResponse.

9, 10: nothing to do.

11: Ah, I see. That's a good point, but it's not fundamental
to the framework. I think we can tackle it when we propose
the actual queries.

12: Cool. I went ahead and dropped the "serdesForStore"
method. I think you're onto something there, and we should
tackle it separately when we propose the actual queries.




On Tue, 2021-11-16 at 15:59 -0800, Guozhang Wang wrote:
> Thanks John! Some more thoughts inlined below.
> 
> On Mon, Nov 15, 2021 at 10:07 PM John Roesler <vv...@apache.org> wrote:
> 
> > Thanks for the review, Guozhang!
> > 
> > 1. This is a great point. I fell into the age-old trap of
> > only considering the simplest store type and forgot about
> > this extra wrinkle of the "key schema" that we use in
> > Windowed and Session stores.
> > 
> > Depending on how we want to forge forward with our provided
> > queries, I think it can still work out ok. The simplest
> > solution is just to have windowed versions of our queries
> > for use on the windowed stores. That should work naively
> > because we're basically just preserving the existing
> > interactions. It might not be ideal in the long run, but at
> > least it lets us make IQv2 orthogonal from other efforts to
> > simplify the stores themselves.
> > 
> > If we do that, then it would actually be correct to go ahead
> > and just return the serdes that are present in the Metered
> > stores today. For example, if I have a Windowed store with
> > Integer keys, then the key serde I get from serdesForStore
> > would just be the IntegerSerde. The query I'd use the
> > serialized key with would be a RawWindowedKeyQuery, which
> > takes a byte[] key and a timestamp. Then, the low-level
> > store (the segmented store in this case) would have to take
> > the next step to use its schema before making that last-mile
> > query. Note, this is precisely how fetch is implemented
> > today in RocksDBWindowStore:
> > 
> > public byte[] fetch(final Bytes key, final long timestamp) {
> >   return wrapped().get(WindowKeySchema.toStoreKeyBinary(key,
> > timestamp, seqnum));
> > }
> > 
> > In other words, if we set up our provided Query types to
> > stick close to the current store query methods, then
> > everything "should work out" (tm).
> > 
> > I think where things start to get more complicated would be
> > if we wanted to expose the actual, raw, on-disk binary key
> > to the user, along with a serde that can interpret it. Then,
> > we would have to pack up the serde and the schema. If we go
> > down that road, then knowing which one (the key serde or the
> > windowed schema + the key serde) the user wants when they
> > ask for "the serde" would be a challenge.
> > 
> > I'm actually thinking maybe we don't need to include the
> > serdesForStore method in this proposal, but instead leave it
> > for follow-on work (if desired) to add it along with raw
> > queries, since it's really only needed if you want raw
> > queries and (as you mentioned later) there may be better
> > ways to present the serdes, which is always easier to figure
> > out once there's a use case.
> > 
> > 
> > 2. Hmm, if I understand what you mean by the "formatted"
> > layer, is that the one supplied by the
> > WindowedBytesStoreSupplier or SessionBytesStoreSupplier in
> > Materialized? I think the basic idea of this proposal is to
> > let whatever store gets supplied there be the "last stop"
> > for the query.
> > 
> > For the case of our default windowed store, this is the
> > segmented RocksDB store. It's true that this store "wraps" a
> > bunch of segments, but it would be the segmented store's
> > responsibility to handle the query, not defer to the
> > segments. This might mean different things for different
> > queries, but naively, I think it can just invoke to the
> > current implementation of each of its methods.
> > 
> > There might be future queries that require more
> > sophisticated responses, but we should be able to add new
> > queries for those, which have no restrictions on their
> > response types. For example, we could choose to respond to a
> > scan with a list of iterators, one for each segment.
> > 
> > 
> For `formatted` stores, I also mean the ListValueStore which was added
> recently for stream-stream joins, for example. The interface is a KV-store
> but that disables same-key overwrites but instead keep all the values of
> the same key as a list, and users can only delete old values by deleting
> the whole key-list (which would of course delete new values as well).
> ListValueStore uses a KeyValueStore as its inner, but would convert the put
> call as append.
> 
> I think in the long run, we should have a different interface other than
> KVStore for this type, and the implementation would then be at the
> `formatted` store layer. That means the `query` should be always
> implemented at the inner layer of the logged store (that could be the most
> `inner` store, or the `fomatted` store), and upper level wrapped stores
> would be delegating to the inner stores.
> 
> As for serdes, here's some more second thoughts: generally speaking, it's
> always convenient for users to pass in the value as object than raw bytes,
> the only exception is if the query is not for exact matching but prefix (or
> suffix, though we do not have such cases today) matching, in which case we
> would need the raw bytes in order to pass in the prefixed bytes into the
> inner store. The returned value though could either be preferred as raw
> bytes, or be deserialized already.
> 
> The composite-serde mostly happens at the key, but not much at the value
> (we only have "value-timestamp" type which needs a composite
> deserialization, all others are direct values). So I'm feeling that a Query
> would be best represented with non-serialized parameter (i.e. `KeyQuery<K,
> V>`), while the query result be optionally raw or deserialized with the
> serde class.
> 
> 
> > 
> > 3. I agree the large switch (or if/else) (or Map) for query
> > dispatch is a concern. That's the thing I'm most worried
> > will become cumbersome. I think your idea is neat, though,
> > because a lot of our surface area is providing a bunch of
> > those different combinations of query attributes. I think if
> > we get a little meta, we can actually fold it into the
> > existing KIP.
> > 
> > Rather than making Query any more restrictive, what we could
> > do is to choose to follow your idea for the provided queries
> > we ship with Streams. Although I had been thinking we would
> > ship a KeyQuery, RangeQuery, etc., we could absolutely
> > compactify those queries as much as possible so that there
> > are only a few queries with those dimensions you listed.
> > 
> > That way we can avoid blowing up the query space with our
> > own provided queries, but we can still keep the framework as
> > general as possible.
> > 
> > 
> Sounds good!
> 
> 
> > 4. I'm not sure, actually! I just thought it would be neat
> > to have. I know I've spent my fair share of adding println
> > statements to Streams or stepping though the debugger when
> > something like that proposal would have done the job.
> > 
> > So, I guess the answer is yes, I was just thinking of it as
> > a debugging/informational tool. I also think that if we want
> > to make it more structured in the future, we should be able
> > to evolve that part of the API without and major problems.
> > 
> > 
> > 5. That's another great point, and it's a miss on my part.
> > The short answer is that we'd simply throw whatever runtime
> > exceptions are appropriate, but I should and will document
> > what they will be.
> > 
> > 
> > 6. I do think those APIs need some attention, but I was
> > actually hoping to treat that as a separate scope for design
> > work later. I think that there shouldn't be any downside to
> > tackling them as orthogonal, but I agree people will wonder
> > about the relationship there, so I can update the KIP with
> > some notes about it.
> > 
> > 
> Thanks! I personally would consider that these APIs would eventually be
> refactored as well as we stick with IQv2, and also the
> `allLocalStorePartitionLags` would be deprecated with Position.
> 
> 
> > 
> > 7. Yes, I've always been a bit on the fence about whether to
> > bundle that in here. The only thing that made me keep it in
> > is that we'd actually have to deprecate the newly proposed
> > StateStore#query method if we want to add it in later. I.e.,
> > we would just propose StateStore#query(query, executionInfo)
> > right now, but then deprecate it and add
> > StateStore#query(query, bound, executionInfo).
> > 
> > Given that, it seems mildly better to just take the leap for
> > now, and if it turns out we can't actually implement it
> > nicely, then we can always drop it from the proposal after
> > the fact.
> > 
> > That said, if that aspect is going to derail this KIP's
> > discussion, I think the lesser evil would indeed be to just
> > drop it now. So far, it seems like there's been some small
> > questions about it, but nothing that really takes us off
> > course. So, if you don't object, I think I'd like to keep it
> > in for a little while longer.
> > 
> > 
> That's a fair point, let's keep it in this KIP then.
> 
> 
> > 
> > 8. Sure, I like that idea. The names are a bit cumbersome.
> > 
> > 9. I had them as separate types so that we could more easily
> > inspect the query type. Otherwise, we'd just have to assume
> > the generics' type is byte[] in the lower layer. I'm not
> > sure that's the right call, but it also seems like the flip
> > of a coin as to which is better.
> > 
> > 10. The StateSerdes class that we have is internal. I used
> > it in the POC to save time, but I gave it a different name
> > in the KIP to make it clear that I'm proposing that we
> > create a proper public interface and not just expose the
> > internal one, which has a bunch of extra stuff in it.
> > 
> > Then again, if I go ahead and drop the serdes from the
> > propsoal entirely, we can worry about that another time!
> > 
> > 
> > 11. I think I might have a typo somewhere, because I'm not
> > following the question. The Query itself defines the result
> > type <R>, QueryResult is just a container wrapping that R
> > result as well as the execution info, etc. per partition.
> > 
> > For a KeyQuery, its signature is:
> >  KeyQuery<K, V> implements Query<V>
> > 
> > So, when you use that query, it does bind R to V, and the
> > result will be a QueryResult<V>.
> > 
> > 
> Cool thanks. My main confusion comes from the inconsistency of key-query
> and scan-query. The former implements Query as:
> 
> KeyQuery<K, V> implements Query<V>:  => binds V to R, and K unbound
> 
> Whereas the latter implements as:
> 
> ScanQuery<K, V> implements Query<KeyValueIterator<K, V>>: => binds
> KeyValueIterator<?, ?> to R, whereas K/V both unbound
> 
> 
> 
> > 
> > 12. I considered doing exactly that. The reason I shied away
> > from it in general is that if you're going to have a "raw"
> > query API, you also need to know the key serde before you do
> > a query (otherwise you can't query at all!). So, bundling a
> > serde with the response only really applies to the value.
> > 
> > 
> See the other comment above: my thinking is actually that, for Query we
> would, potentially always, prefer to have it as in deserialized object
> format (except for partial match, which we can discuss separately), we only
> need to consider whether the QueryResult should be in raw or in
> deserialized format.
> 
> 
> > It still might be a good idea, but since I was thinking I
> > already needed a separate discovery method for the key
> > serde, then I might as well just keep the key and value
> > serdes together, rather than bundling the value serde with
> > each value.
> > 
> > I do think it would be neat to have queries that don't
> > deserialize the value by default and give you the option to
> > do it on demand, or maybe just de-structure some parts of
> > the value out (eg just reading the timestamp without
> > deserializing the rest of the value). But, now that I've
> > started to think about dropping the "raw" query design from
> > the scope of this KIP, I'm wondering if we can just consider
> > this use case later. It does seem plausible that we could
> > choose to bundle the serdes with the values for those
> > queries without needing a change in this KIP's framework, at
> > least.
> > 
> > 
> > Whew! Thanks again for the great thoughts. I'll make the
> > changes I mentioned tomorrow. Please let me know if you
> > disagree with any of my responses!
> > 
> > Thanks,
> > -John
> > 
> > On Mon, 2021-11-15 at 17:29 -0800, Guozhang Wang wrote:
> > > Hello John,
> > > 
> > > Great, great, great writeup! :) And thank you for bringing this up
> > finally.
> > > I have made a pass on the KIP as well as the POC PR of it, here are some
> > > initial thoughts:
> > > 
> > > First are some meta ones:
> > > 
> > > 1. Today the serdes do not only happen at the metered-store layer,
> > > unfortunately. For windowed / sessioned stores, and also some newly added
> > > ones for stream-stream joins that are optimized for time-based range
> > > queries, for example, the serdes are actually composite at multiple
> > layers.
> > > And the queries on the outer interface are also translated with serde
> > > wrapped / stripped along the way in layers. To be more specific, today
> > our
> > > store hierarchy is like this:
> > > 
> > > metered * -> cached -> logged * -> formatted * (e.g. segmenged,
> > > list-valued) -> inner (rocksdb, in-memory)
> > > 
> > > and serdes today could happen on the layers with * above, where each
> > layer
> > > is stuffing a bit more as prefix/suffix into the query bytes. This is not
> > > really by design or ideal, but a result of history accumulated tech
> > debts..
> > > There's a related JIRA ticket for it:
> > > https://issues.apache.org/jira/browse/KAFKA-13286. I guess my point is
> > that
> > > we need to be a bit careful regarding how to implement the
> > > `KafkaStreams#serdesForStore(storeName)`, as we may expect some bumpy
> > roads
> > > moving forward.
> > > 
> > > 2. Related to 1 above, I think we cannot always delegate the `query()`
> > > implementation to the `inner` store layer, since some serde, or even some
> > > computation logic happens at the outer, especially the `formatted` layer.
> > > For example, besides the cached layer, the `formatted` layer also needs
> > to
> > > make sure the `query` object is being appropriately translated
> > beforeMaterialized
> > > handing it off downstreams to the inner store, and also need to translate
> > > the `queryResult` a bit while handing it upwards in the hierarchy.
> > > 
> > > 3. As we add more query types in the IQv2, the inner store's `query`
> > > instantiation may be getting very clumsy with a large "switch" condition
> > on
> > > all the possible query types. Although custom stores could consider only
> > > supporting a few, having the `default` case to ignore all others,
> > built-in
> > > stores may still need to exhaust all possible types. I'm wondering if
> > it's
> > > a good trade-off to make `Query` be more restricted on extensibility to
> > > have less exploding query type space, e.g. if a Query can only be
> > extended
> > > with some predefined dimensions like:
> > > 
> > > * query-field: key, non-key (some field extractor from the value bytes
> > need
> > > to be provided)
> > > * query-scope: single, range
> > > * query-match-type (only be useful for a range scope): prefix-match (e.g.
> > > for a range key query, the provided is only a prefix, and all keys
> > > containing this prefix should be returned), full-match
> > > * query-value-type: object, raw-bytes
> > > 
> > > 4. What's the expected usage for the execution info? Is it only for
> > logging
> > > purposes? If yes then I think not enforcing any string format is fine,
> > that
> > > the store layers can just attach any string information that they feel
> > > useful.
> > > 
> > > 5. I do not find any specific proposals for exception handling, what
> > would
> > > that look like? E.g. besides all the expected error cases like
> > non-active,
> > > how would we communicate other unexpected error cases such as store
> > closed,
> > > IO error, bad query parameters, etc?
> > > 
> > > 6. Since we do not deprecate any existing APIs in this KIP, it's a bit
> > hard
> > > for readers to understand what is eventually going to be covered by IQv2.
> > > For example, we know that eventually `KafkaStreams#store` would be gone,
> > > but what about `KafkaStreams#queryMetadataForKey`, and
> > > `#streamsMetadataForStore`, and also `allLocalStorePartitionLags`? I
> > think
> > > it would be great to mention the end world state with IQv2 even if the
> > KIP
> > > itself would not deprecate anything yet.
> > > 
> > > 7. It seems people are still a bit confused about the
> > > "Position/PositionBound" topics, and personally I think it's okay to
> > > exclude them in this KIP just to keep its (already large) scope smaller.
> > > Also after we started implementing the KIP in full, we may have learned
> > new
> > > things while fighting the details in the weeds, and that would be a
> > better
> > > timing for us to consider new parameters such as bounds, but also caching
> > > bypassing, and other potential features as well.
> > > 
> > > Some minor ones:
> > > 
> > > 8. What about just naming the new classes as `StateQueryRequest/Result`,
> > or
> > > `StoreQueryRequest/Result`? The word "interactive" is for describing its
> > > semantics in docs, but I feel for class names we can use a more
> > meaningful
> > > prefix.
> > > 
> > > 9. Should the RawKeyQuery be extending `KeyQuery<byte[]>`, or directly
> > > implementing `Query<byte[]`>?
> > > 
> > > 10. Why do we need the new class "InteractiveQuerySerdes" along with
> > > existing classes? In your PR it seems just using `StateSerdes` directly.
> > > 
> > > 11. Why do we have a new template type "R" in the QueryResult class in
> > > addition to "<K, V>"? Should R always be equal to V?
> > > 
> > > 12. Related to 10/11 above, what about letting the QueryResult to always
> > be
> > > returning values in raw bytes, along with the serdes? And then it's up to
> > > the callers whether they want the bytes to be immediately deserialized or
> > > want them to be written somewhere and deserialized later? More
> > specifically
> > > we would only have a single function as KafkaStreams#query, and the
> > > QueryResult would be:
> > > 
> > > InteractiveQueryResult {
> > >   public InteractiveQueryResult(Map<Integer /*partition*/,
> > > QueryResult<byte[]>> partitionResults);
> > > 
> > > ...
> > > 
> > >   public StateSerdes<K, V> serdes();
> > > }
> > > 
> > > And then the result itself can also provide some built-in functions to do
> > > the deser upon returning results, so that user's code would not get more
> > > complicated. The benefit is that we end up with a single function in
> > > `KafkaStreams`, and the inner store always only need to implement the raw
> > > query types. Of course doing this would not be so easy given the fact
> > > described in 1) above, but I feel this would be a good way to first
> > > abstract away this tech debt, and then later resolve it to a single
> > place.
> > > 
> > > ---------------
> > > 
> > > Again, congrats on the very nice proposal! Let me know what you think
> > about
> > > my comments.
> > > 
> > > Guozhang
> > > 
> > > 
> > > On Mon, Nov 15, 2021 at 2:52 PM John Roesler <vv...@apache.org>
> > wrote:
> > > 
> > > > Hi Patrick and Sagar,
> > > > 
> > > > Thanks for the feedback! I'll just break out the questions
> > > > and address them one at a time.
> > > > 
> > > > Patrick 1.
> > > > The default bound that I'm proposing is only to let active
> > > > tasks answer queries (which is also the default with IQ
> > > > today). Therefore, calling getPositionBound() would return a
> > > > PositionBound for which isLatest() is true.
> > > > 
> > > > Patrick 2.
> > > > I might have missed something in revision, but I'm not sure
> > > > what you're referring to exactly when you say they are
> > > > different. The IQRequest only has a PositionBound, and the
> > > > IQResponse only has a (concrete) Position, so I think they
> > > > are named accordingly (getPositionBound and getPosition). Am
> > > > I overlooking what you are talking about?
> > > > 
> > > > Sagar 1.
> > > > I think you're talking about the KeyValueStore#get(key)
> > > > method? This is a really good question. I went ahead and
> > > > dropped in an addendum to the KeyQuery example to show how
> > > > you would run the query in today's API. Here's a caracature
> > > > of the two APIS:
> > > > 
> > > > current:
> > > >   KeyValueStore store = kafkaStreams.store(
> > > >     "mystore",
> > > >     keyValueStore())
> > > >   int value = store.get(key);
> > > > 
> > > > proposed:
> > > >   int value = kafkaStreams.query(
> > > >     "mystore",
> > > >     KeyQuery.withKey(key));
> > > > 
> > > > So, today we first get the store interface and then we
> > > > invoke the method, and under the proposal, we would instead
> > > > just ask KafkaStreams to execute the query on the store. In
> > > > addition to all the other stuff I said in the motivation,
> > > > one thing I think is neat about this API is that it means we
> > > > can re-use queries across stores. So, for example, we could
> > > > also use KeyQuery on WindowStores, even though there's no
> > > > common interface between WindowStore and KeyValueStore.
> > > > 
> > > > In other words, stores can support any queries that make
> > > > sense and _not_ support any queries that don't make sense.
> > > > This gets into your second question...
> > > > 
> > > > Sagar 2.
> > > > Very good question. Your experience with your KIP-614
> > > > contribution was one of the things that made me want to
> > > > revise IQ to begin with. It seems like there's a really
> > > > stark gap between how straightforward the proposal is to add
> > > > a new store operation, and then how hard it is to actually
> > > > implement a new operation, due to all those intervening
> > > > wrappers.
> > > > 
> > > > There are two categories of wrappers to worry about:
> > > > - Facades: These only exist to disallow access to write
> > > > APIs, which are exposed through IQ today but shouldn't be
> > > > called. These are simply unnecessary under IQv2, since we
> > > > only run queries instead of returning the whole store.
> > > > - Store Layers: This is what you provided examples of. We
> > > > have store layers that let us compose features like
> > > > de/serialization and metering, changelogging, caching, etc.
> > > > A nice thing about this design is that we mostly don't have
> > > > to worry at all about those wrapper layers at all. Each of
> > > > these stores would simply delegate any query to lower layers
> > > > unless there is something they need to do. In my POC, I
> > > > simply added a delegating implementation to
> > > > WrappedStateStore, which meant that I didn't need to touch
> > > > most of the wrappers when I added a new query.
> > > > 
> > > > Here's what I think future contributors will have to worry
> > > > about:
> > > > 1. The basic query execution in the base byte stores
> > > > (RocksDB and InMemory)
> > > > 2. The Caching stores IF they want the query to be served
> > > > from the cache
> > > > 3. The Metered stores IF some serialization needs to be done
> > > > for the query
> > > > 
> > > > And that's it! We should be able to add new queries without
> > > > touching any other store layer besides those, and each one
> > > > of those is involved because it has some specific reason to
> > > > be.
> > > > 
> > > > 
> > > > Thanks again, Patrick and Sagar! Please let me know if I
> > > > failed to address your questions, or if you have any more.
> > > > 
> > > > Thanks,
> > > > -John
> > > > 
> > > > On Mon, 2021-11-15 at 22:37 +0530, Sagar wrote:
> > > > > Hi John,
> > > > > 
> > > > > Thanks for the great writeup! Couple of things I wanted to bring
> > up(may
> > > > or
> > > > > mayn't be relevant):
> > > > > 
> > > > > 1) The sample implementation that you have presented for KeyQuery is
> > very
> > > > > helpful. One thing which may be added to it is how it connects to the
> > > > > KeyValue.get(key) method. That's something that atleast I couldn't
> > > > totally
> > > > > figure out-not sure about others though. I understand that it is out
> > of
> > > > > scope of th KIP to explain for every query that IQ supports but one
> > > > > implementation just to get a sense of how the changes would feel
> > like.
> > > > > 2) The other thing that I wanted to know is that StateStore on it's
> > own
> > > > has
> > > > > a lot of implementations and some of them are wrappers, So at what
> > levels
> > > > > do users need to implement the query methods? Like a
> > MeteredKeyValueStore
> > > > > wraps RocksDbStore and calls it internally through a wrapped call.
> > As per
> > > > > the new changes, how would the scheme of things look like for the
> > same
> > > > > KeyQuery?
> > > > > 
> > > > > Thanks!
> > > > > Sagar.
> > > > > 
> > > > > 
> > > > > On Mon, Nov 15, 2021 at 6:20 PM Patrick Stuedi
> > > > <ps...@confluent.io.invalid>
> > > > > wrote:
> > > > > 
> > > > > > Hi John,
> > > > > > 
> > > > > > Thanks for submitting the KIP! One question I have is, assuming one
> > > > > > instantiates InteractiveQueryRequest via withQuery, and then later
> > > > calls
> > > > > > getPositionBound, what will the result be? Also I noticed the
> > Position
> > > > > > returning method is in InteractiveQueryRequest and
> > > > InteractiveQueryResult
> > > > > > is named differently, any particular reason?
> > > > > > 
> > > > > > Best,
> > > > > >   Patrick
> > > > > > 
> > > > > > 
> > > > > > On Fri, Nov 12, 2021 at 12:29 AM John Roesler <vvcephei@apache.org
> > > 
> > > > wrote:
> > > > > > 
> > > > > > > Thanks for taking a look, Sophie!
> > > > > > > 
> > > > > > > Ah, that was a revision error. I had initially been planning
> > > > > > > an Optional<Set<Integer>> with Optional.empty() meaning to
> > > > > > > fetch all partitions, but then decided it was needlessly
> > > > > > > complex and changed it to the current proposal with two
> > > > > > > methods:
> > > > > > > 
> > > > > > > boolean isAllPartitions();
> > > > > > > Set<Integer> getPartitions(); (which would throw an
> > > > > > > exception if it's an "all partitions" request).
> > > > > > > 
> > > > > > > I've corrected the javadoc and also documented the
> > > > > > > exception.
> > > > > > > 
> > > > > > > Thanks!
> > > > > > > -John
> > > > > > > 
> > > > > > > On Thu, 2021-11-11 at 15:03 -0800, Sophie Blee-Goldman
> > > > > > > wrote:
> > > > > > > > Thanks John, I've been looking forward to this for a while
> > now. It
> > > > > > > > was pretty horrifying to learn
> > > > > > > > how present-day IQ works  (or rather, doesn't work) with custom
> > > > state
> > > > > > > > stores :/
> > > > > > > > 
> > > > > > > > One minor cosmetic point, In the InteractiveQueryRequest class,
> > > > the #
> > > > > > > > getPartitions
> > > > > > > > method has a return type of Set<Integer>, but the javadocs
> > refer to
> > > > > > > Optional.
> > > > > > > > Not
> > > > > > > > sure which is intended for this API, but if is supposed to be
> > the
> > > > > > return
> > > > > > > > type, do you perhaps
> > > > > > > > mean for it to be  Optional.ofEmpty() and Optional.of(non-empty
> > > > set)
> > > > > > > > rather than Optional.of(empty set) and Optional.of(non-empty
> > set) ?
> > > > > > > > 
> > > > > > > > On Thu, Nov 11, 2021 at 12:03 PM John Roesler <
> > vvcephei@apache.org
> > > > > 
> > > > > > > wrote:
> > > > > > > > 
> > > > > > > > > Hello again, all,
> > > > > > > > > 
> > > > > > > > > Just bumping this discussion on a new, more flexible
> > > > > > > > > Interactive Query API in Kafka Streams.
> > > > > > > > > 
> > > > > > > > > If there are no concerns, I'll go ahead and call a vote on
> > > > > > > > > Monday.
> > > > > > > > > 
> > > > > > > > > Thanks!
> > > > > > > > > -John
> > > > > > > > > 
> > > > > > > > > On Tue, 2021-11-09 at 17:37 -0600, John Roesler wrote:
> > > > > > > > > > Hello all,
> > > > > > > > > > 
> > > > > > > > > > I'd like to start the discussion for KIP-796, which
> > proposes
> > > > > > > > > > a revamp of the Interactive Query APIs in Kafka Streams.
> > > > > > > > > > 
> > > > > > > > > > The proposal is here:
> > > > > > > > > > https://cwiki.apache.org/confluence/x/34xnCw
> > > > > > > > > > 
> > > > > > > > > > I look forward to your feedback!
> > > > > > > > > > 
> > > > > > > > > > Thank you,
> > > > > > > > > > -John
> > > > > > > > > > 
> > > > > > > > > 
> > > > > > > > > 
> > > > > > > > > 
> > > > > > > 
> > > > > > > 
> > > > > > > 
> > > > > > 
> > > > 
> > > > 
> > > 
> > 
> > 
> 


Re: [DISCUSS] KIP-796: Interactive Query v2

Posted by Guozhang Wang <wa...@gmail.com>.
Thanks John! Some more thoughts inlined below.

On Mon, Nov 15, 2021 at 10:07 PM John Roesler <vv...@apache.org> wrote:

> Thanks for the review, Guozhang!
>
> 1. This is a great point. I fell into the age-old trap of
> only considering the simplest store type and forgot about
> this extra wrinkle of the "key schema" that we use in
> Windowed and Session stores.
>
> Depending on how we want to forge forward with our provided
> queries, I think it can still work out ok. The simplest
> solution is just to have windowed versions of our queries
> for use on the windowed stores. That should work naively
> because we're basically just preserving the existing
> interactions. It might not be ideal in the long run, but at
> least it lets us make IQv2 orthogonal from other efforts to
> simplify the stores themselves.
>
> If we do that, then it would actually be correct to go ahead
> and just return the serdes that are present in the Metered
> stores today. For example, if I have a Windowed store with
> Integer keys, then the key serde I get from serdesForStore
> would just be the IntegerSerde. The query I'd use the
> serialized key with would be a RawWindowedKeyQuery, which
> takes a byte[] key and a timestamp. Then, the low-level
> store (the segmented store in this case) would have to take
> the next step to use its schema before making that last-mile
> query. Note, this is precisely how fetch is implemented
> today in RocksDBWindowStore:
>
> public byte[] fetch(final Bytes key, final long timestamp) {
>   return wrapped().get(WindowKeySchema.toStoreKeyBinary(key,
> timestamp, seqnum));
> }
>
> In other words, if we set up our provided Query types to
> stick close to the current store query methods, then
> everything "should work out" (tm).
>
> I think where things start to get more complicated would be
> if we wanted to expose the actual, raw, on-disk binary key
> to the user, along with a serde that can interpret it. Then,
> we would have to pack up the serde and the schema. If we go
> down that road, then knowing which one (the key serde or the
> windowed schema + the key serde) the user wants when they
> ask for "the serde" would be a challenge.
>
> I'm actually thinking maybe we don't need to include the
> serdesForStore method in this proposal, but instead leave it
> for follow-on work (if desired) to add it along with raw
> queries, since it's really only needed if you want raw
> queries and (as you mentioned later) there may be better
> ways to present the serdes, which is always easier to figure
> out once there's a use case.
>
>
> 2. Hmm, if I understand what you mean by the "formatted"
> layer, is that the one supplied by the
> WindowedBytesStoreSupplier or SessionBytesStoreSupplier in
> Materialized? I think the basic idea of this proposal is to
> let whatever store gets supplied there be the "last stop"
> for the query.
>
> For the case of our default windowed store, this is the
> segmented RocksDB store. It's true that this store "wraps" a
> bunch of segments, but it would be the segmented store's
> responsibility to handle the query, not defer to the
> segments. This might mean different things for different
> queries, but naively, I think it can just invoke to the
> current implementation of each of its methods.
>
> There might be future queries that require more
> sophisticated responses, but we should be able to add new
> queries for those, which have no restrictions on their
> response types. For example, we could choose to respond to a
> scan with a list of iterators, one for each segment.
>
>
For `formatted` stores, I also mean the ListValueStore which was added
recently for stream-stream joins, for example. The interface is a KV-store
but that disables same-key overwrites but instead keep all the values of
the same key as a list, and users can only delete old values by deleting
the whole key-list (which would of course delete new values as well).
ListValueStore uses a KeyValueStore as its inner, but would convert the put
call as append.

I think in the long run, we should have a different interface other than
KVStore for this type, and the implementation would then be at the
`formatted` store layer. That means the `query` should be always
implemented at the inner layer of the logged store (that could be the most
`inner` store, or the `fomatted` store), and upper level wrapped stores
would be delegating to the inner stores.

As for serdes, here's some more second thoughts: generally speaking, it's
always convenient for users to pass in the value as object than raw bytes,
the only exception is if the query is not for exact matching but prefix (or
suffix, though we do not have such cases today) matching, in which case we
would need the raw bytes in order to pass in the prefixed bytes into the
inner store. The returned value though could either be preferred as raw
bytes, or be deserialized already.

The composite-serde mostly happens at the key, but not much at the value
(we only have "value-timestamp" type which needs a composite
deserialization, all others are direct values). So I'm feeling that a Query
would be best represented with non-serialized parameter (i.e. `KeyQuery<K,
V>`), while the query result be optionally raw or deserialized with the
serde class.


>
> 3. I agree the large switch (or if/else) (or Map) for query
> dispatch is a concern. That's the thing I'm most worried
> will become cumbersome. I think your idea is neat, though,
> because a lot of our surface area is providing a bunch of
> those different combinations of query attributes. I think if
> we get a little meta, we can actually fold it into the
> existing KIP.
>
> Rather than making Query any more restrictive, what we could
> do is to choose to follow your idea for the provided queries
> we ship with Streams. Although I had been thinking we would
> ship a KeyQuery, RangeQuery, etc., we could absolutely
> compactify those queries as much as possible so that there
> are only a few queries with those dimensions you listed.
>
> That way we can avoid blowing up the query space with our
> own provided queries, but we can still keep the framework as
> general as possible.
>
>
Sounds good!


> 4. I'm not sure, actually! I just thought it would be neat
> to have. I know I've spent my fair share of adding println
> statements to Streams or stepping though the debugger when
> something like that proposal would have done the job.
>
> So, I guess the answer is yes, I was just thinking of it as
> a debugging/informational tool. I also think that if we want
> to make it more structured in the future, we should be able
> to evolve that part of the API without and major problems.
>
>
> 5. That's another great point, and it's a miss on my part.
> The short answer is that we'd simply throw whatever runtime
> exceptions are appropriate, but I should and will document
> what they will be.
>
>
> 6. I do think those APIs need some attention, but I was
> actually hoping to treat that as a separate scope for design
> work later. I think that there shouldn't be any downside to
> tackling them as orthogonal, but I agree people will wonder
> about the relationship there, so I can update the KIP with
> some notes about it.
>
>
Thanks! I personally would consider that these APIs would eventually be
refactored as well as we stick with IQv2, and also the
`allLocalStorePartitionLags` would be deprecated with Position.


>
> 7. Yes, I've always been a bit on the fence about whether to
> bundle that in here. The only thing that made me keep it in
> is that we'd actually have to deprecate the newly proposed
> StateStore#query method if we want to add it in later. I.e.,
> we would just propose StateStore#query(query, executionInfo)
> right now, but then deprecate it and add
> StateStore#query(query, bound, executionInfo).
>
> Given that, it seems mildly better to just take the leap for
> now, and if it turns out we can't actually implement it
> nicely, then we can always drop it from the proposal after
> the fact.
>
> That said, if that aspect is going to derail this KIP's
> discussion, I think the lesser evil would indeed be to just
> drop it now. So far, it seems like there's been some small
> questions about it, but nothing that really takes us off
> course. So, if you don't object, I think I'd like to keep it
> in for a little while longer.
>
>
That's a fair point, let's keep it in this KIP then.


>
> 8. Sure, I like that idea. The names are a bit cumbersome.
>
> 9. I had them as separate types so that we could more easily
> inspect the query type. Otherwise, we'd just have to assume
> the generics' type is byte[] in the lower layer. I'm not
> sure that's the right call, but it also seems like the flip
> of a coin as to which is better.
>
> 10. The StateSerdes class that we have is internal. I used
> it in the POC to save time, but I gave it a different name
> in the KIP to make it clear that I'm proposing that we
> create a proper public interface and not just expose the
> internal one, which has a bunch of extra stuff in it.
>
> Then again, if I go ahead and drop the serdes from the
> propsoal entirely, we can worry about that another time!
>
>
> 11. I think I might have a typo somewhere, because I'm not
> following the question. The Query itself defines the result
> type <R>, QueryResult is just a container wrapping that R
> result as well as the execution info, etc. per partition.
>
> For a KeyQuery, its signature is:
>  KeyQuery<K, V> implements Query<V>
>
> So, when you use that query, it does bind R to V, and the
> result will be a QueryResult<V>.
>
>
Cool thanks. My main confusion comes from the inconsistency of key-query
and scan-query. The former implements Query as:

KeyQuery<K, V> implements Query<V>:  => binds V to R, and K unbound

Whereas the latter implements as:

ScanQuery<K, V> implements Query<KeyValueIterator<K, V>>: => binds
KeyValueIterator<?, ?> to R, whereas K/V both unbound



>
> 12. I considered doing exactly that. The reason I shied away
> from it in general is that if you're going to have a "raw"
> query API, you also need to know the key serde before you do
> a query (otherwise you can't query at all!). So, bundling a
> serde with the response only really applies to the value.
>
>
See the other comment above: my thinking is actually that, for Query we
would, potentially always, prefer to have it as in deserialized object
format (except for partial match, which we can discuss separately), we only
need to consider whether the QueryResult should be in raw or in
deserialized format.


> It still might be a good idea, but since I was thinking I
> already needed a separate discovery method for the key
> serde, then I might as well just keep the key and value
> serdes together, rather than bundling the value serde with
> each value.
>
> I do think it would be neat to have queries that don't
> deserialize the value by default and give you the option to
> do it on demand, or maybe just de-structure some parts of
> the value out (eg just reading the timestamp without
> deserializing the rest of the value). But, now that I've
> started to think about dropping the "raw" query design from
> the scope of this KIP, I'm wondering if we can just consider
> this use case later. It does seem plausible that we could
> choose to bundle the serdes with the values for those
> queries without needing a change in this KIP's framework, at
> least.
>
>
> Whew! Thanks again for the great thoughts. I'll make the
> changes I mentioned tomorrow. Please let me know if you
> disagree with any of my responses!
>
> Thanks,
> -John
>
> On Mon, 2021-11-15 at 17:29 -0800, Guozhang Wang wrote:
> > Hello John,
> >
> > Great, great, great writeup! :) And thank you for bringing this up
> finally.
> > I have made a pass on the KIP as well as the POC PR of it, here are some
> > initial thoughts:
> >
> > First are some meta ones:
> >
> > 1. Today the serdes do not only happen at the metered-store layer,
> > unfortunately. For windowed / sessioned stores, and also some newly added
> > ones for stream-stream joins that are optimized for time-based range
> > queries, for example, the serdes are actually composite at multiple
> layers.
> > And the queries on the outer interface are also translated with serde
> > wrapped / stripped along the way in layers. To be more specific, today
> our
> > store hierarchy is like this:
> >
> > metered * -> cached -> logged * -> formatted * (e.g. segmenged,
> > list-valued) -> inner (rocksdb, in-memory)
> >
> > and serdes today could happen on the layers with * above, where each
> layer
> > is stuffing a bit more as prefix/suffix into the query bytes. This is not
> > really by design or ideal, but a result of history accumulated tech
> debts..
> > There's a related JIRA ticket for it:
> > https://issues.apache.org/jira/browse/KAFKA-13286. I guess my point is
> that
> > we need to be a bit careful regarding how to implement the
> > `KafkaStreams#serdesForStore(storeName)`, as we may expect some bumpy
> roads
> > moving forward.
> >
> > 2. Related to 1 above, I think we cannot always delegate the `query()`
> > implementation to the `inner` store layer, since some serde, or even some
> > computation logic happens at the outer, especially the `formatted` layer.
> > For example, besides the cached layer, the `formatted` layer also needs
> to
> > make sure the `query` object is being appropriately translated
> beforeMaterialized
> > handing it off downstreams to the inner store, and also need to translate
> > the `queryResult` a bit while handing it upwards in the hierarchy.
> >
> > 3. As we add more query types in the IQv2, the inner store's `query`
> > instantiation may be getting very clumsy with a large "switch" condition
> on
> > all the possible query types. Although custom stores could consider only
> > supporting a few, having the `default` case to ignore all others,
> built-in
> > stores may still need to exhaust all possible types. I'm wondering if
> it's
> > a good trade-off to make `Query` be more restricted on extensibility to
> > have less exploding query type space, e.g. if a Query can only be
> extended
> > with some predefined dimensions like:
> >
> > * query-field: key, non-key (some field extractor from the value bytes
> need
> > to be provided)
> > * query-scope: single, range
> > * query-match-type (only be useful for a range scope): prefix-match (e.g.
> > for a range key query, the provided is only a prefix, and all keys
> > containing this prefix should be returned), full-match
> > * query-value-type: object, raw-bytes
> >
> > 4. What's the expected usage for the execution info? Is it only for
> logging
> > purposes? If yes then I think not enforcing any string format is fine,
> that
> > the store layers can just attach any string information that they feel
> > useful.
> >
> > 5. I do not find any specific proposals for exception handling, what
> would
> > that look like? E.g. besides all the expected error cases like
> non-active,
> > how would we communicate other unexpected error cases such as store
> closed,
> > IO error, bad query parameters, etc?
> >
> > 6. Since we do not deprecate any existing APIs in this KIP, it's a bit
> hard
> > for readers to understand what is eventually going to be covered by IQv2.
> > For example, we know that eventually `KafkaStreams#store` would be gone,
> > but what about `KafkaStreams#queryMetadataForKey`, and
> > `#streamsMetadataForStore`, and also `allLocalStorePartitionLags`? I
> think
> > it would be great to mention the end world state with IQv2 even if the
> KIP
> > itself would not deprecate anything yet.
> >
> > 7. It seems people are still a bit confused about the
> > "Position/PositionBound" topics, and personally I think it's okay to
> > exclude them in this KIP just to keep its (already large) scope smaller.
> > Also after we started implementing the KIP in full, we may have learned
> new
> > things while fighting the details in the weeds, and that would be a
> better
> > timing for us to consider new parameters such as bounds, but also caching
> > bypassing, and other potential features as well.
> >
> > Some minor ones:
> >
> > 8. What about just naming the new classes as `StateQueryRequest/Result`,
> or
> > `StoreQueryRequest/Result`? The word "interactive" is for describing its
> > semantics in docs, but I feel for class names we can use a more
> meaningful
> > prefix.
> >
> > 9. Should the RawKeyQuery be extending `KeyQuery<byte[]>`, or directly
> > implementing `Query<byte[]`>?
> >
> > 10. Why do we need the new class "InteractiveQuerySerdes" along with
> > existing classes? In your PR it seems just using `StateSerdes` directly.
> >
> > 11. Why do we have a new template type "R" in the QueryResult class in
> > addition to "<K, V>"? Should R always be equal to V?
> >
> > 12. Related to 10/11 above, what about letting the QueryResult to always
> be
> > returning values in raw bytes, along with the serdes? And then it's up to
> > the callers whether they want the bytes to be immediately deserialized or
> > want them to be written somewhere and deserialized later? More
> specifically
> > we would only have a single function as KafkaStreams#query, and the
> > QueryResult would be:
> >
> > InteractiveQueryResult {
> >   public InteractiveQueryResult(Map<Integer /*partition*/,
> > QueryResult<byte[]>> partitionResults);
> >
> > ...
> >
> >   public StateSerdes<K, V> serdes();
> > }
> >
> > And then the result itself can also provide some built-in functions to do
> > the deser upon returning results, so that user's code would not get more
> > complicated. The benefit is that we end up with a single function in
> > `KafkaStreams`, and the inner store always only need to implement the raw
> > query types. Of course doing this would not be so easy given the fact
> > described in 1) above, but I feel this would be a good way to first
> > abstract away this tech debt, and then later resolve it to a single
> place.
> >
> > ---------------
> >
> > Again, congrats on the very nice proposal! Let me know what you think
> about
> > my comments.
> >
> > Guozhang
> >
> >
> > On Mon, Nov 15, 2021 at 2:52 PM John Roesler <vv...@apache.org>
> wrote:
> >
> > > Hi Patrick and Sagar,
> > >
> > > Thanks for the feedback! I'll just break out the questions
> > > and address them one at a time.
> > >
> > > Patrick 1.
> > > The default bound that I'm proposing is only to let active
> > > tasks answer queries (which is also the default with IQ
> > > today). Therefore, calling getPositionBound() would return a
> > > PositionBound for which isLatest() is true.
> > >
> > > Patrick 2.
> > > I might have missed something in revision, but I'm not sure
> > > what you're referring to exactly when you say they are
> > > different. The IQRequest only has a PositionBound, and the
> > > IQResponse only has a (concrete) Position, so I think they
> > > are named accordingly (getPositionBound and getPosition). Am
> > > I overlooking what you are talking about?
> > >
> > > Sagar 1.
> > > I think you're talking about the KeyValueStore#get(key)
> > > method? This is a really good question. I went ahead and
> > > dropped in an addendum to the KeyQuery example to show how
> > > you would run the query in today's API. Here's a caracature
> > > of the two APIS:
> > >
> > > current:
> > >   KeyValueStore store = kafkaStreams.store(
> > >     "mystore",
> > >     keyValueStore())
> > >   int value = store.get(key);
> > >
> > > proposed:
> > >   int value = kafkaStreams.query(
> > >     "mystore",
> > >     KeyQuery.withKey(key));
> > >
> > > So, today we first get the store interface and then we
> > > invoke the method, and under the proposal, we would instead
> > > just ask KafkaStreams to execute the query on the store. In
> > > addition to all the other stuff I said in the motivation,
> > > one thing I think is neat about this API is that it means we
> > > can re-use queries across stores. So, for example, we could
> > > also use KeyQuery on WindowStores, even though there's no
> > > common interface between WindowStore and KeyValueStore.
> > >
> > > In other words, stores can support any queries that make
> > > sense and _not_ support any queries that don't make sense.
> > > This gets into your second question...
> > >
> > > Sagar 2.
> > > Very good question. Your experience with your KIP-614
> > > contribution was one of the things that made me want to
> > > revise IQ to begin with. It seems like there's a really
> > > stark gap between how straightforward the proposal is to add
> > > a new store operation, and then how hard it is to actually
> > > implement a new operation, due to all those intervening
> > > wrappers.
> > >
> > > There are two categories of wrappers to worry about:
> > > - Facades: These only exist to disallow access to write
> > > APIs, which are exposed through IQ today but shouldn't be
> > > called. These are simply unnecessary under IQv2, since we
> > > only run queries instead of returning the whole store.
> > > - Store Layers: This is what you provided examples of. We
> > > have store layers that let us compose features like
> > > de/serialization and metering, changelogging, caching, etc.
> > > A nice thing about this design is that we mostly don't have
> > > to worry at all about those wrapper layers at all. Each of
> > > these stores would simply delegate any query to lower layers
> > > unless there is something they need to do. In my POC, I
> > > simply added a delegating implementation to
> > > WrappedStateStore, which meant that I didn't need to touch
> > > most of the wrappers when I added a new query.
> > >
> > > Here's what I think future contributors will have to worry
> > > about:
> > > 1. The basic query execution in the base byte stores
> > > (RocksDB and InMemory)
> > > 2. The Caching stores IF they want the query to be served
> > > from the cache
> > > 3. The Metered stores IF some serialization needs to be done
> > > for the query
> > >
> > > And that's it! We should be able to add new queries without
> > > touching any other store layer besides those, and each one
> > > of those is involved because it has some specific reason to
> > > be.
> > >
> > >
> > > Thanks again, Patrick and Sagar! Please let me know if I
> > > failed to address your questions, or if you have any more.
> > >
> > > Thanks,
> > > -John
> > >
> > > On Mon, 2021-11-15 at 22:37 +0530, Sagar wrote:
> > > > Hi John,
> > > >
> > > > Thanks for the great writeup! Couple of things I wanted to bring
> up(may
> > > or
> > > > mayn't be relevant):
> > > >
> > > > 1) The sample implementation that you have presented for KeyQuery is
> very
> > > > helpful. One thing which may be added to it is how it connects to the
> > > > KeyValue.get(key) method. That's something that atleast I couldn't
> > > totally
> > > > figure out-not sure about others though. I understand that it is out
> of
> > > > scope of th KIP to explain for every query that IQ supports but one
> > > > implementation just to get a sense of how the changes would feel
> like.
> > > > 2) The other thing that I wanted to know is that StateStore on it's
> own
> > > has
> > > > a lot of implementations and some of them are wrappers, So at what
> levels
> > > > do users need to implement the query methods? Like a
> MeteredKeyValueStore
> > > > wraps RocksDbStore and calls it internally through a wrapped call.
> As per
> > > > the new changes, how would the scheme of things look like for the
> same
> > > > KeyQuery?
> > > >
> > > > Thanks!
> > > > Sagar.
> > > >
> > > >
> > > > On Mon, Nov 15, 2021 at 6:20 PM Patrick Stuedi
> > > <ps...@confluent.io.invalid>
> > > > wrote:
> > > >
> > > > > Hi John,
> > > > >
> > > > > Thanks for submitting the KIP! One question I have is, assuming one
> > > > > instantiates InteractiveQueryRequest via withQuery, and then later
> > > calls
> > > > > getPositionBound, what will the result be? Also I noticed the
> Position
> > > > > returning method is in InteractiveQueryRequest and
> > > InteractiveQueryResult
> > > > > is named differently, any particular reason?
> > > > >
> > > > > Best,
> > > > >   Patrick
> > > > >
> > > > >
> > > > > On Fri, Nov 12, 2021 at 12:29 AM John Roesler <vvcephei@apache.org
> >
> > > wrote:
> > > > >
> > > > > > Thanks for taking a look, Sophie!
> > > > > >
> > > > > > Ah, that was a revision error. I had initially been planning
> > > > > > an Optional<Set<Integer>> with Optional.empty() meaning to
> > > > > > fetch all partitions, but then decided it was needlessly
> > > > > > complex and changed it to the current proposal with two
> > > > > > methods:
> > > > > >
> > > > > > boolean isAllPartitions();
> > > > > > Set<Integer> getPartitions(); (which would throw an
> > > > > > exception if it's an "all partitions" request).
> > > > > >
> > > > > > I've corrected the javadoc and also documented the
> > > > > > exception.
> > > > > >
> > > > > > Thanks!
> > > > > > -John
> > > > > >
> > > > > > On Thu, 2021-11-11 at 15:03 -0800, Sophie Blee-Goldman
> > > > > > wrote:
> > > > > > > Thanks John, I've been looking forward to this for a while
> now. It
> > > > > > > was pretty horrifying to learn
> > > > > > > how present-day IQ works  (or rather, doesn't work) with custom
> > > state
> > > > > > > stores :/
> > > > > > >
> > > > > > > One minor cosmetic point, In the InteractiveQueryRequest class,
> > > the #
> > > > > > > getPartitions
> > > > > > > method has a return type of Set<Integer>, but the javadocs
> refer to
> > > > > > Optional.
> > > > > > > Not
> > > > > > > sure which is intended for this API, but if is supposed to be
> the
> > > > > return
> > > > > > > type, do you perhaps
> > > > > > > mean for it to be  Optional.ofEmpty() and Optional.of(non-empty
> > > set)
> > > > > > > rather than Optional.of(empty set) and Optional.of(non-empty
> set) ?
> > > > > > >
> > > > > > > On Thu, Nov 11, 2021 at 12:03 PM John Roesler <
> vvcephei@apache.org
> > > >
> > > > > > wrote:
> > > > > > >
> > > > > > > > Hello again, all,
> > > > > > > >
> > > > > > > > Just bumping this discussion on a new, more flexible
> > > > > > > > Interactive Query API in Kafka Streams.
> > > > > > > >
> > > > > > > > If there are no concerns, I'll go ahead and call a vote on
> > > > > > > > Monday.
> > > > > > > >
> > > > > > > > Thanks!
> > > > > > > > -John
> > > > > > > >
> > > > > > > > On Tue, 2021-11-09 at 17:37 -0600, John Roesler wrote:
> > > > > > > > > Hello all,
> > > > > > > > >
> > > > > > > > > I'd like to start the discussion for KIP-796, which
> proposes
> > > > > > > > > a revamp of the Interactive Query APIs in Kafka Streams.
> > > > > > > > >
> > > > > > > > > The proposal is here:
> > > > > > > > > https://cwiki.apache.org/confluence/x/34xnCw
> > > > > > > > >
> > > > > > > > > I look forward to your feedback!
> > > > > > > > >
> > > > > > > > > Thank you,
> > > > > > > > > -John
> > > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > >
> > >
> > >
> >
>
>

-- 
-- Guozhang

Re: [DISCUSS] KIP-796: Interactive Query v2

Posted by John Roesler <vv...@apache.org>.
Thanks for the review, Guozhang!

1. This is a great point. I fell into the age-old trap of
only considering the simplest store type and forgot about
this extra wrinkle of the "key schema" that we use in
Windowed and Session stores.

Depending on how we want to forge forward with our provided
queries, I think it can still work out ok. The simplest
solution is just to have windowed versions of our queries
for use on the windowed stores. That should work naively
because we're basically just preserving the existing
interactions. It might not be ideal in the long run, but at
least it lets us make IQv2 orthogonal from other efforts to
simplify the stores themselves.

If we do that, then it would actually be correct to go ahead
and just return the serdes that are present in the Metered
stores today. For example, if I have a Windowed store with
Integer keys, then the key serde I get from serdesForStore
would just be the IntegerSerde. The query I'd use the
serialized key with would be a RawWindowedKeyQuery, which
takes a byte[] key and a timestamp. Then, the low-level
store (the segmented store in this case) would have to take
the next step to use its schema before making that last-mile
query. Note, this is precisely how fetch is implemented
today in RocksDBWindowStore:

public byte[] fetch(final Bytes key, final long timestamp) {
  return wrapped().get(WindowKeySchema.toStoreKeyBinary(key,
timestamp, seqnum));
}

In other words, if we set up our provided Query types to
stick close to the current store query methods, then
everything "should work out" (tm).

I think where things start to get more complicated would be
if we wanted to expose the actual, raw, on-disk binary key
to the user, along with a serde that can interpret it. Then,
we would have to pack up the serde and the schema. If we go
down that road, then knowing which one (the key serde or the
windowed schema + the key serde) the user wants when they
ask for "the serde" would be a challenge.

I'm actually thinking maybe we don't need to include the
serdesForStore method in this proposal, but instead leave it
for follow-on work (if desired) to add it along with raw
queries, since it's really only needed if you want raw
queries and (as you mentioned later) there may be better
ways to present the serdes, which is always easier to figure
out once there's a use case.


2. Hmm, if I understand what you mean by the "formatted"
layer, is that the one supplied by the
WindowedBytesStoreSupplier or SessionBytesStoreSupplier in
Materialized? I think the basic idea of this proposal is to
let whatever store gets supplied there be the "last stop"
for the query.

For the case of our default windowed store, this is the
segmented RocksDB store. It's true that this store "wraps" a
bunch of segments, but it would be the segmented store's
responsibility to handle the query, not defer to the
segments. This might mean different things for different
queries, but naively, I think it can just invoke to the
current implementation of each of its methods.

There might be future queries that require more
sophisticated responses, but we should be able to add new
queries for those, which have no restrictions on their
response types. For example, we could choose to respond to a
scan with a list of iterators, one for each segment.


3. I agree the large switch (or if/else) (or Map) for query
dispatch is a concern. That's the thing I'm most worried
will become cumbersome. I think your idea is neat, though,
because a lot of our surface area is providing a bunch of
those different combinations of query attributes. I think if
we get a little meta, we can actually fold it into the
existing KIP.

Rather than making Query any more restrictive, what we could
do is to choose to follow your idea for the provided queries
we ship with Streams. Although I had been thinking we would
ship a KeyQuery, RangeQuery, etc., we could absolutely
compactify those queries as much as possible so that there
are only a few queries with those dimensions you listed.

That way we can avoid blowing up the query space with our
own provided queries, but we can still keep the framework as
general as possible.

4. I'm not sure, actually! I just thought it would be neat
to have. I know I've spent my fair share of adding println
statements to Streams or stepping though the debugger when
something like that proposal would have done the job.

So, I guess the answer is yes, I was just thinking of it as
a debugging/informational tool. I also think that if we want
to make it more structured in the future, we should be able
to evolve that part of the API without and major problems.


5. That's another great point, and it's a miss on my part.
The short answer is that we'd simply throw whatever runtime
exceptions are appropriate, but I should and will document
what they will be.


6. I do think those APIs need some attention, but I was
actually hoping to treat that as a separate scope for design
work later. I think that there shouldn't be any downside to
tackling them as orthogonal, but I agree people will wonder
about the relationship there, so I can update the KIP with
some notes about it.


7. Yes, I've always been a bit on the fence about whether to
bundle that in here. The only thing that made me keep it in
is that we'd actually have to deprecate the newly proposed
StateStore#query method if we want to add it in later. I.e.,
we would just propose StateStore#query(query, executionInfo)
right now, but then deprecate it and add
StateStore#query(query, bound, executionInfo).

Given that, it seems mildly better to just take the leap for
now, and if it turns out we can't actually implement it
nicely, then we can always drop it from the proposal after
the fact.

That said, if that aspect is going to derail this KIP's
discussion, I think the lesser evil would indeed be to just
drop it now. So far, it seems like there's been some small
questions about it, but nothing that really takes us off
course. So, if you don't object, I think I'd like to keep it
in for a little while longer.


8. Sure, I like that idea. The names are a bit cumbersome.

9. I had them as separate types so that we could more easily
inspect the query type. Otherwise, we'd just have to assume
the generics' type is byte[] in the lower layer. I'm not
sure that's the right call, but it also seems like the flip
of a coin as to which is better.

10. The StateSerdes class that we have is internal. I used
it in the POC to save time, but I gave it a different name
in the KIP to make it clear that I'm proposing that we
create a proper public interface and not just expose the
internal one, which has a bunch of extra stuff in it.

Then again, if I go ahead and drop the serdes from the
propsoal entirely, we can worry about that another time!


11. I think I might have a typo somewhere, because I'm not
following the question. The Query itself defines the result
type <R>, QueryResult is just a container wrapping that R
result as well as the execution info, etc. per partition.

For a KeyQuery, its signature is:
 KeyQuery<K, V> implements Query<V>

So, when you use that query, it does bind R to V, and the
result will be a QueryResult<V>.


12. I considered doing exactly that. The reason I shied away
from it in general is that if you're going to have a "raw"
query API, you also need to know the key serde before you do
a query (otherwise you can't query at all!). So, bundling a
serde with the response only really applies to the value.

It still might be a good idea, but since I was thinking I
already needed a separate discovery method for the key
serde, then I might as well just keep the key and value
serdes together, rather than bundling the value serde with
each value.

I do think it would be neat to have queries that don't
deserialize the value by default and give you the option to
do it on demand, or maybe just de-structure some parts of
the value out (eg just reading the timestamp without
deserializing the rest of the value). But, now that I've
started to think about dropping the "raw" query design from
the scope of this KIP, I'm wondering if we can just consider
this use case later. It does seem plausible that we could
choose to bundle the serdes with the values for those
queries without needing a change in this KIP's framework, at
least.


Whew! Thanks again for the great thoughts. I'll make the
changes I mentioned tomorrow. Please let me know if you
disagree with any of my responses!

Thanks,
-John

On Mon, 2021-11-15 at 17:29 -0800, Guozhang Wang wrote:
> Hello John,
> 
> Great, great, great writeup! :) And thank you for bringing this up finally.
> I have made a pass on the KIP as well as the POC PR of it, here are some
> initial thoughts:
> 
> First are some meta ones:
> 
> 1. Today the serdes do not only happen at the metered-store layer,
> unfortunately. For windowed / sessioned stores, and also some newly added
> ones for stream-stream joins that are optimized for time-based range
> queries, for example, the serdes are actually composite at multiple layers.
> And the queries on the outer interface are also translated with serde
> wrapped / stripped along the way in layers. To be more specific, today our
> store hierarchy is like this:
> 
> metered * -> cached -> logged * -> formatted * (e.g. segmenged,
> list-valued) -> inner (rocksdb, in-memory)
> 
> and serdes today could happen on the layers with * above, where each layer
> is stuffing a bit more as prefix/suffix into the query bytes. This is not
> really by design or ideal, but a result of history accumulated tech debts..
> There's a related JIRA ticket for it:
> https://issues.apache.org/jira/browse/KAFKA-13286. I guess my point is that
> we need to be a bit careful regarding how to implement the
> `KafkaStreams#serdesForStore(storeName)`, as we may expect some bumpy roads
> moving forward.
> 
> 2. Related to 1 above, I think we cannot always delegate the `query()`
> implementation to the `inner` store layer, since some serde, or even some
> computation logic happens at the outer, especially the `formatted` layer.
> For example, besides the cached layer, the `formatted` layer also needs to
> make sure the `query` object is being appropriately translated beforeMaterialized
> handing it off downstreams to the inner store, and also need to translate
> the `queryResult` a bit while handing it upwards in the hierarchy.
> 
> 3. As we add more query types in the IQv2, the inner store's `query`
> instantiation may be getting very clumsy with a large "switch" condition on
> all the possible query types. Although custom stores could consider only
> supporting a few, having the `default` case to ignore all others, built-in
> stores may still need to exhaust all possible types. I'm wondering if it's
> a good trade-off to make `Query` be more restricted on extensibility to
> have less exploding query type space, e.g. if a Query can only be extended
> with some predefined dimensions like:
> 
> * query-field: key, non-key (some field extractor from the value bytes need
> to be provided)
> * query-scope: single, range
> * query-match-type (only be useful for a range scope): prefix-match (e.g.
> for a range key query, the provided is only a prefix, and all keys
> containing this prefix should be returned), full-match
> * query-value-type: object, raw-bytes
> 
> 4. What's the expected usage for the execution info? Is it only for logging
> purposes? If yes then I think not enforcing any string format is fine, that
> the store layers can just attach any string information that they feel
> useful.
> 
> 5. I do not find any specific proposals for exception handling, what would
> that look like? E.g. besides all the expected error cases like non-active,
> how would we communicate other unexpected error cases such as store closed,
> IO error, bad query parameters, etc?
> 
> 6. Since we do not deprecate any existing APIs in this KIP, it's a bit hard
> for readers to understand what is eventually going to be covered by IQv2.
> For example, we know that eventually `KafkaStreams#store` would be gone,
> but what about `KafkaStreams#queryMetadataForKey`, and
> `#streamsMetadataForStore`, and also `allLocalStorePartitionLags`? I think
> it would be great to mention the end world state with IQv2 even if the KIP
> itself would not deprecate anything yet.
> 
> 7. It seems people are still a bit confused about the
> "Position/PositionBound" topics, and personally I think it's okay to
> exclude them in this KIP just to keep its (already large) scope smaller.
> Also after we started implementing the KIP in full, we may have learned new
> things while fighting the details in the weeds, and that would be a better
> timing for us to consider new parameters such as bounds, but also caching
> bypassing, and other potential features as well.
> 
> Some minor ones:
> 
> 8. What about just naming the new classes as `StateQueryRequest/Result`, or
> `StoreQueryRequest/Result`? The word "interactive" is for describing its
> semantics in docs, but I feel for class names we can use a more meaningful
> prefix.
> 
> 9. Should the RawKeyQuery be extending `KeyQuery<byte[]>`, or directly
> implementing `Query<byte[]`>?
> 
> 10. Why do we need the new class "InteractiveQuerySerdes" along with
> existing classes? In your PR it seems just using `StateSerdes` directly.
> 
> 11. Why do we have a new template type "R" in the QueryResult class in
> addition to "<K, V>"? Should R always be equal to V?
> 
> 12. Related to 10/11 above, what about letting the QueryResult to always be
> returning values in raw bytes, along with the serdes? And then it's up to
> the callers whether they want the bytes to be immediately deserialized or
> want them to be written somewhere and deserialized later? More specifically
> we would only have a single function as KafkaStreams#query, and the
> QueryResult would be:
> 
> InteractiveQueryResult {
>   public InteractiveQueryResult(Map<Integer /*partition*/,
> QueryResult<byte[]>> partitionResults);
> 
> ...
> 
>   public StateSerdes<K, V> serdes();
> }
> 
> And then the result itself can also provide some built-in functions to do
> the deser upon returning results, so that user's code would not get more
> complicated. The benefit is that we end up with a single function in
> `KafkaStreams`, and the inner store always only need to implement the raw
> query types. Of course doing this would not be so easy given the fact
> described in 1) above, but I feel this would be a good way to first
> abstract away this tech debt, and then later resolve it to a single place.
> 
> ---------------
> 
> Again, congrats on the very nice proposal! Let me know what you think about
> my comments.
> 
> Guozhang
> 
> 
> On Mon, Nov 15, 2021 at 2:52 PM John Roesler <vv...@apache.org> wrote:
> 
> > Hi Patrick and Sagar,
> > 
> > Thanks for the feedback! I'll just break out the questions
> > and address them one at a time.
> > 
> > Patrick 1.
> > The default bound that I'm proposing is only to let active
> > tasks answer queries (which is also the default with IQ
> > today). Therefore, calling getPositionBound() would return a
> > PositionBound for which isLatest() is true.
> > 
> > Patrick 2.
> > I might have missed something in revision, but I'm not sure
> > what you're referring to exactly when you say they are
> > different. The IQRequest only has a PositionBound, and the
> > IQResponse only has a (concrete) Position, so I think they
> > are named accordingly (getPositionBound and getPosition). Am
> > I overlooking what you are talking about?
> > 
> > Sagar 1.
> > I think you're talking about the KeyValueStore#get(key)
> > method? This is a really good question. I went ahead and
> > dropped in an addendum to the KeyQuery example to show how
> > you would run the query in today's API. Here's a caracature
> > of the two APIS:
> > 
> > current:
> >   KeyValueStore store = kafkaStreams.store(
> >     "mystore",
> >     keyValueStore())
> >   int value = store.get(key);
> > 
> > proposed:
> >   int value = kafkaStreams.query(
> >     "mystore",
> >     KeyQuery.withKey(key));
> > 
> > So, today we first get the store interface and then we
> > invoke the method, and under the proposal, we would instead
> > just ask KafkaStreams to execute the query on the store. In
> > addition to all the other stuff I said in the motivation,
> > one thing I think is neat about this API is that it means we
> > can re-use queries across stores. So, for example, we could
> > also use KeyQuery on WindowStores, even though there's no
> > common interface between WindowStore and KeyValueStore.
> > 
> > In other words, stores can support any queries that make
> > sense and _not_ support any queries that don't make sense.
> > This gets into your second question...
> > 
> > Sagar 2.
> > Very good question. Your experience with your KIP-614
> > contribution was one of the things that made me want to
> > revise IQ to begin with. It seems like there's a really
> > stark gap between how straightforward the proposal is to add
> > a new store operation, and then how hard it is to actually
> > implement a new operation, due to all those intervening
> > wrappers.
> > 
> > There are two categories of wrappers to worry about:
> > - Facades: These only exist to disallow access to write
> > APIs, which are exposed through IQ today but shouldn't be
> > called. These are simply unnecessary under IQv2, since we
> > only run queries instead of returning the whole store.
> > - Store Layers: This is what you provided examples of. We
> > have store layers that let us compose features like
> > de/serialization and metering, changelogging, caching, etc.
> > A nice thing about this design is that we mostly don't have
> > to worry at all about those wrapper layers at all. Each of
> > these stores would simply delegate any query to lower layers
> > unless there is something they need to do. In my POC, I
> > simply added a delegating implementation to
> > WrappedStateStore, which meant that I didn't need to touch
> > most of the wrappers when I added a new query.
> > 
> > Here's what I think future contributors will have to worry
> > about:
> > 1. The basic query execution in the base byte stores
> > (RocksDB and InMemory)
> > 2. The Caching stores IF they want the query to be served
> > from the cache
> > 3. The Metered stores IF some serialization needs to be done
> > for the query
> > 
> > And that's it! We should be able to add new queries without
> > touching any other store layer besides those, and each one
> > of those is involved because it has some specific reason to
> > be.
> > 
> > 
> > Thanks again, Patrick and Sagar! Please let me know if I
> > failed to address your questions, or if you have any more.
> > 
> > Thanks,
> > -John
> > 
> > On Mon, 2021-11-15 at 22:37 +0530, Sagar wrote:
> > > Hi John,
> > > 
> > > Thanks for the great writeup! Couple of things I wanted to bring up(may
> > or
> > > mayn't be relevant):
> > > 
> > > 1) The sample implementation that you have presented for KeyQuery is very
> > > helpful. One thing which may be added to it is how it connects to the
> > > KeyValue.get(key) method. That's something that atleast I couldn't
> > totally
> > > figure out-not sure about others though. I understand that it is out of
> > > scope of th KIP to explain for every query that IQ supports but one
> > > implementation just to get a sense of how the changes would feel like.
> > > 2) The other thing that I wanted to know is that StateStore on it's own
> > has
> > > a lot of implementations and some of them are wrappers, So at what levels
> > > do users need to implement the query methods? Like a MeteredKeyValueStore
> > > wraps RocksDbStore and calls it internally through a wrapped call. As per
> > > the new changes, how would the scheme of things look like for the same
> > > KeyQuery?
> > > 
> > > Thanks!
> > > Sagar.
> > > 
> > > 
> > > On Mon, Nov 15, 2021 at 6:20 PM Patrick Stuedi
> > <ps...@confluent.io.invalid>
> > > wrote:
> > > 
> > > > Hi John,
> > > > 
> > > > Thanks for submitting the KIP! One question I have is, assuming one
> > > > instantiates InteractiveQueryRequest via withQuery, and then later
> > calls
> > > > getPositionBound, what will the result be? Also I noticed the Position
> > > > returning method is in InteractiveQueryRequest and
> > InteractiveQueryResult
> > > > is named differently, any particular reason?
> > > > 
> > > > Best,
> > > >   Patrick
> > > > 
> > > > 
> > > > On Fri, Nov 12, 2021 at 12:29 AM John Roesler <vv...@apache.org>
> > wrote:
> > > > 
> > > > > Thanks for taking a look, Sophie!
> > > > > 
> > > > > Ah, that was a revision error. I had initially been planning
> > > > > an Optional<Set<Integer>> with Optional.empty() meaning to
> > > > > fetch all partitions, but then decided it was needlessly
> > > > > complex and changed it to the current proposal with two
> > > > > methods:
> > > > > 
> > > > > boolean isAllPartitions();
> > > > > Set<Integer> getPartitions(); (which would throw an
> > > > > exception if it's an "all partitions" request).
> > > > > 
> > > > > I've corrected the javadoc and also documented the
> > > > > exception.
> > > > > 
> > > > > Thanks!
> > > > > -John
> > > > > 
> > > > > On Thu, 2021-11-11 at 15:03 -0800, Sophie Blee-Goldman
> > > > > wrote:
> > > > > > Thanks John, I've been looking forward to this for a while now. It
> > > > > > was pretty horrifying to learn
> > > > > > how present-day IQ works  (or rather, doesn't work) with custom
> > state
> > > > > > stores :/
> > > > > > 
> > > > > > One minor cosmetic point, In the InteractiveQueryRequest class,
> > the #
> > > > > > getPartitions
> > > > > > method has a return type of Set<Integer>, but the javadocs refer to
> > > > > Optional.
> > > > > > Not
> > > > > > sure which is intended for this API, but if is supposed to be the
> > > > return
> > > > > > type, do you perhaps
> > > > > > mean for it to be  Optional.ofEmpty() and Optional.of(non-empty
> > set)
> > > > > > rather than Optional.of(empty set) and Optional.of(non-empty set) ?
> > > > > > 
> > > > > > On Thu, Nov 11, 2021 at 12:03 PM John Roesler <vvcephei@apache.org
> > > 
> > > > > wrote:
> > > > > > 
> > > > > > > Hello again, all,
> > > > > > > 
> > > > > > > Just bumping this discussion on a new, more flexible
> > > > > > > Interactive Query API in Kafka Streams.
> > > > > > > 
> > > > > > > If there are no concerns, I'll go ahead and call a vote on
> > > > > > > Monday.
> > > > > > > 
> > > > > > > Thanks!
> > > > > > > -John
> > > > > > > 
> > > > > > > On Tue, 2021-11-09 at 17:37 -0600, John Roesler wrote:
> > > > > > > > Hello all,
> > > > > > > > 
> > > > > > > > I'd like to start the discussion for KIP-796, which proposes
> > > > > > > > a revamp of the Interactive Query APIs in Kafka Streams.
> > > > > > > > 
> > > > > > > > The proposal is here:
> > > > > > > > https://cwiki.apache.org/confluence/x/34xnCw
> > > > > > > > 
> > > > > > > > I look forward to your feedback!
> > > > > > > > 
> > > > > > > > Thank you,
> > > > > > > > -John
> > > > > > > > 
> > > > > > > 
> > > > > > > 
> > > > > > > 
> > > > > 
> > > > > 
> > > > > 
> > > > 
> > 
> > 
> 


Re: [DISCUSS] KIP-796: Interactive Query v2

Posted by Guozhang Wang <wa...@gmail.com>.
Hello John,

Great, great, great writeup! :) And thank you for bringing this up finally.
I have made a pass on the KIP as well as the POC PR of it, here are some
initial thoughts:

First are some meta ones:

1. Today the serdes do not only happen at the metered-store layer,
unfortunately. For windowed / sessioned stores, and also some newly added
ones for stream-stream joins that are optimized for time-based range
queries, for example, the serdes are actually composite at multiple layers.
And the queries on the outer interface are also translated with serde
wrapped / stripped along the way in layers. To be more specific, today our
store hierarchy is like this:

metered * -> cached -> logged * -> formatted * (e.g. segmenged,
list-valued) -> inner (rocksdb, in-memory)

and serdes today could happen on the layers with * above, where each layer
is stuffing a bit more as prefix/suffix into the query bytes. This is not
really by design or ideal, but a result of history accumulated tech debts..
There's a related JIRA ticket for it:
https://issues.apache.org/jira/browse/KAFKA-13286. I guess my point is that
we need to be a bit careful regarding how to implement the
`KafkaStreams#serdesForStore(storeName)`, as we may expect some bumpy roads
moving forward.

2. Related to 1 above, I think we cannot always delegate the `query()`
implementation to the `inner` store layer, since some serde, or even some
computation logic happens at the outer, especially the `formatted` layer.
For example, besides the cached layer, the `formatted` layer also needs to
make sure the `query` object is being appropriately translated before
handing it off downstreams to the inner store, and also need to translate
the `queryResult` a bit while handing it upwards in the hierarchy.

3. As we add more query types in the IQv2, the inner store's `query`
instantiation may be getting very clumsy with a large "switch" condition on
all the possible query types. Although custom stores could consider only
supporting a few, having the `default` case to ignore all others, built-in
stores may still need to exhaust all possible types. I'm wondering if it's
a good trade-off to make `Query` be more restricted on extensibility to
have less exploding query type space, e.g. if a Query can only be extended
with some predefined dimensions like:

* query-field: key, non-key (some field extractor from the value bytes need
to be provided)
* query-scope: single, range
* query-match-type (only be useful for a range scope): prefix-match (e.g.
for a range key query, the provided is only a prefix, and all keys
containing this prefix should be returned), full-match
* query-value-type: object, raw-bytes

4. What's the expected usage for the execution info? Is it only for logging
purposes? If yes then I think not enforcing any string format is fine, that
the store layers can just attach any string information that they feel
useful.

5. I do not find any specific proposals for exception handling, what would
that look like? E.g. besides all the expected error cases like non-active,
how would we communicate other unexpected error cases such as store closed,
IO error, bad query parameters, etc?

6. Since we do not deprecate any existing APIs in this KIP, it's a bit hard
for readers to understand what is eventually going to be covered by IQv2.
For example, we know that eventually `KafkaStreams#store` would be gone,
but what about `KafkaStreams#queryMetadataForKey`, and
`#streamsMetadataForStore`, and also `allLocalStorePartitionLags`? I think
it would be great to mention the end world state with IQv2 even if the KIP
itself would not deprecate anything yet.

7. It seems people are still a bit confused about the
"Position/PositionBound" topics, and personally I think it's okay to
exclude them in this KIP just to keep its (already large) scope smaller.
Also after we started implementing the KIP in full, we may have learned new
things while fighting the details in the weeds, and that would be a better
timing for us to consider new parameters such as bounds, but also caching
bypassing, and other potential features as well.

Some minor ones:

8. What about just naming the new classes as `StateQueryRequest/Result`, or
`StoreQueryRequest/Result`? The word "interactive" is for describing its
semantics in docs, but I feel for class names we can use a more meaningful
prefix.

9. Should the RawKeyQuery be extending `KeyQuery<byte[]>`, or directly
implementing `Query<byte[]`>?

10. Why do we need the new class "InteractiveQuerySerdes" along with
existing classes? In your PR it seems just using `StateSerdes` directly.

11. Why do we have a new template type "R" in the QueryResult class in
addition to "<K, V>"? Should R always be equal to V?

12. Related to 10/11 above, what about letting the QueryResult to always be
returning values in raw bytes, along with the serdes? And then it's up to
the callers whether they want the bytes to be immediately deserialized or
want them to be written somewhere and deserialized later? More specifically
we would only have a single function as KafkaStreams#query, and the
QueryResult would be:

InteractiveQueryResult {
  public InteractiveQueryResult(Map<Integer /*partition*/,
QueryResult<byte[]>> partitionResults);

...

  public StateSerdes<K, V> serdes();
}

And then the result itself can also provide some built-in functions to do
the deser upon returning results, so that user's code would not get more
complicated. The benefit is that we end up with a single function in
`KafkaStreams`, and the inner store always only need to implement the raw
query types. Of course doing this would not be so easy given the fact
described in 1) above, but I feel this would be a good way to first
abstract away this tech debt, and then later resolve it to a single place.

---------------

Again, congrats on the very nice proposal! Let me know what you think about
my comments.

Guozhang


On Mon, Nov 15, 2021 at 2:52 PM John Roesler <vv...@apache.org> wrote:

> Hi Patrick and Sagar,
>
> Thanks for the feedback! I'll just break out the questions
> and address them one at a time.
>
> Patrick 1.
> The default bound that I'm proposing is only to let active
> tasks answer queries (which is also the default with IQ
> today). Therefore, calling getPositionBound() would return a
> PositionBound for which isLatest() is true.
>
> Patrick 2.
> I might have missed something in revision, but I'm not sure
> what you're referring to exactly when you say they are
> different. The IQRequest only has a PositionBound, and the
> IQResponse only has a (concrete) Position, so I think they
> are named accordingly (getPositionBound and getPosition). Am
> I overlooking what you are talking about?
>
> Sagar 1.
> I think you're talking about the KeyValueStore#get(key)
> method? This is a really good question. I went ahead and
> dropped in an addendum to the KeyQuery example to show how
> you would run the query in today's API. Here's a caracature
> of the two APIS:
>
> current:
>   KeyValueStore store = kafkaStreams.store(
>     "mystore",
>     keyValueStore())
>   int value = store.get(key);
>
> proposed:
>   int value = kafkaStreams.query(
>     "mystore",
>     KeyQuery.withKey(key));
>
> So, today we first get the store interface and then we
> invoke the method, and under the proposal, we would instead
> just ask KafkaStreams to execute the query on the store. In
> addition to all the other stuff I said in the motivation,
> one thing I think is neat about this API is that it means we
> can re-use queries across stores. So, for example, we could
> also use KeyQuery on WindowStores, even though there's no
> common interface between WindowStore and KeyValueStore.
>
> In other words, stores can support any queries that make
> sense and _not_ support any queries that don't make sense.
> This gets into your second question...
>
> Sagar 2.
> Very good question. Your experience with your KIP-614
> contribution was one of the things that made me want to
> revise IQ to begin with. It seems like there's a really
> stark gap between how straightforward the proposal is to add
> a new store operation, and then how hard it is to actually
> implement a new operation, due to all those intervening
> wrappers.
>
> There are two categories of wrappers to worry about:
> - Facades: These only exist to disallow access to write
> APIs, which are exposed through IQ today but shouldn't be
> called. These are simply unnecessary under IQv2, since we
> only run queries instead of returning the whole store.
> - Store Layers: This is what you provided examples of. We
> have store layers that let us compose features like
> de/serialization and metering, changelogging, caching, etc.
> A nice thing about this design is that we mostly don't have
> to worry at all about those wrapper layers at all. Each of
> these stores would simply delegate any query to lower layers
> unless there is something they need to do. In my POC, I
> simply added a delegating implementation to
> WrappedStateStore, which meant that I didn't need to touch
> most of the wrappers when I added a new query.
>
> Here's what I think future contributors will have to worry
> about:
> 1. The basic query execution in the base byte stores
> (RocksDB and InMemory)
> 2. The Caching stores IF they want the query to be served
> from the cache
> 3. The Metered stores IF some serialization needs to be done
> for the query
>
> And that's it! We should be able to add new queries without
> touching any other store layer besides those, and each one
> of those is involved because it has some specific reason to
> be.
>
>
> Thanks again, Patrick and Sagar! Please let me know if I
> failed to address your questions, or if you have any more.
>
> Thanks,
> -John
>
> On Mon, 2021-11-15 at 22:37 +0530, Sagar wrote:
> > Hi John,
> >
> > Thanks for the great writeup! Couple of things I wanted to bring up(may
> or
> > mayn't be relevant):
> >
> > 1) The sample implementation that you have presented for KeyQuery is very
> > helpful. One thing which may be added to it is how it connects to the
> > KeyValue.get(key) method. That's something that atleast I couldn't
> totally
> > figure out-not sure about others though. I understand that it is out of
> > scope of th KIP to explain for every query that IQ supports but one
> > implementation just to get a sense of how the changes would feel like.
> > 2) The other thing that I wanted to know is that StateStore on it's own
> has
> > a lot of implementations and some of them are wrappers, So at what levels
> > do users need to implement the query methods? Like a MeteredKeyValueStore
> > wraps RocksDbStore and calls it internally through a wrapped call. As per
> > the new changes, how would the scheme of things look like for the same
> > KeyQuery?
> >
> > Thanks!
> > Sagar.
> >
> >
> > On Mon, Nov 15, 2021 at 6:20 PM Patrick Stuedi
> <ps...@confluent.io.invalid>
> > wrote:
> >
> > > Hi John,
> > >
> > > Thanks for submitting the KIP! One question I have is, assuming one
> > > instantiates InteractiveQueryRequest via withQuery, and then later
> calls
> > > getPositionBound, what will the result be? Also I noticed the Position
> > > returning method is in InteractiveQueryRequest and
> InteractiveQueryResult
> > > is named differently, any particular reason?
> > >
> > > Best,
> > >   Patrick
> > >
> > >
> > > On Fri, Nov 12, 2021 at 12:29 AM John Roesler <vv...@apache.org>
> wrote:
> > >
> > > > Thanks for taking a look, Sophie!
> > > >
> > > > Ah, that was a revision error. I had initially been planning
> > > > an Optional<Set<Integer>> with Optional.empty() meaning to
> > > > fetch all partitions, but then decided it was needlessly
> > > > complex and changed it to the current proposal with two
> > > > methods:
> > > >
> > > > boolean isAllPartitions();
> > > > Set<Integer> getPartitions(); (which would throw an
> > > > exception if it's an "all partitions" request).
> > > >
> > > > I've corrected the javadoc and also documented the
> > > > exception.
> > > >
> > > > Thanks!
> > > > -John
> > > >
> > > > On Thu, 2021-11-11 at 15:03 -0800, Sophie Blee-Goldman
> > > > wrote:
> > > > > Thanks John, I've been looking forward to this for a while now. It
> > > > > was pretty horrifying to learn
> > > > > how present-day IQ works  (or rather, doesn't work) with custom
> state
> > > > > stores :/
> > > > >
> > > > > One minor cosmetic point, In the InteractiveQueryRequest class,
> the #
> > > > > getPartitions
> > > > > method has a return type of Set<Integer>, but the javadocs refer to
> > > > Optional.
> > > > > Not
> > > > > sure which is intended for this API, but if is supposed to be the
> > > return
> > > > > type, do you perhaps
> > > > > mean for it to be  Optional.ofEmpty() and Optional.of(non-empty
> set)
> > > > > rather than Optional.of(empty set) and Optional.of(non-empty set) ?
> > > > >
> > > > > On Thu, Nov 11, 2021 at 12:03 PM John Roesler <vvcephei@apache.org
> >
> > > > wrote:
> > > > >
> > > > > > Hello again, all,
> > > > > >
> > > > > > Just bumping this discussion on a new, more flexible
> > > > > > Interactive Query API in Kafka Streams.
> > > > > >
> > > > > > If there are no concerns, I'll go ahead and call a vote on
> > > > > > Monday.
> > > > > >
> > > > > > Thanks!
> > > > > > -John
> > > > > >
> > > > > > On Tue, 2021-11-09 at 17:37 -0600, John Roesler wrote:
> > > > > > > Hello all,
> > > > > > >
> > > > > > > I'd like to start the discussion for KIP-796, which proposes
> > > > > > > a revamp of the Interactive Query APIs in Kafka Streams.
> > > > > > >
> > > > > > > The proposal is here:
> > > > > > > https://cwiki.apache.org/confluence/x/34xnCw
> > > > > > >
> > > > > > > I look forward to your feedback!
> > > > > > >
> > > > > > > Thank you,
> > > > > > > -John
> > > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > >
> > > >
> > > >
> > >
>
>

-- 
-- Guozhang

Re: [DISCUSS] KIP-796: Interactive Query v2

Posted by John Roesler <vv...@apache.org>.
Thanks for the reply, Sagar,

Thanks for bringing up the point about documentation, I do
think it would be a great idea for us to add a section to
the IQ doc page that's basically a "store extension guide"
that gives an overview of how to implement custom queries
and custom stores. That would help people see how to go
about extending Streams to meet their own needs, and also
how to put together a PR to add new queries to Kafka Streams
if/when they want to contribute their new queries upstream.

I will mention that when I make my next batch of updates to
the KIP (hopefully today).



Regarding remote query, the short answer is that, no, this
KIP doesn't imclude any new remote query capabilities.

I have mulled over remote queries for a while now. On one
hand, I would be really cool if Streams provided that
functionality natively. On the other hand, it introduces an
entirely new client-to-client communication pattern, which
doesn't exist anywhere in Apache Kafka today. I'm worried
that such an expansion would open Pandora's box in terms of
the complexity of configuring Streams, security models, etc.
It's possible, if IQ becomes a much more significant part of
Streams's capabilities, that the benefits of implementing
remote query could one day overcome the costs, but it
doesn't seem like that day is today.

That's the main reason I've held off from proposing remote
query capabilities in the past. Specifically for this KIP,
it's just outside the scope; this KIP is really focused on
improving the framework for executing local queries.

Thanks again!
-John

On Wed, 2021-11-17 at 22:09 +0530, Sagar wrote:
> Thanks John for answering the 2 questions. Pt #1 makes sense to me now.
> 
> Regarding Pt #2, first of all thanks for bringing up KIP-614 :D I did learn
> about the interfaces the hard way and probably due to that, the PR really
> stretched a lot. Having said that, the point that you mentioned about any
> future implementations needing to worry about the base stores, caching and
> metered stores, would it make sense to add them explicitly to the KIP and
> also to Javadocs if possible? That would guide the future contributors.
> WDYT?
> 
> The other question I have is (may be irrelevant) but with these changes, is
> there going to be any impact on remote state store querying capabilities?
> 
> Thanks!
> Sagar.
> 
> On Tue, Nov 16, 2021 at 4:22 AM John Roesler <vv...@apache.org> wrote:
> 
> > Hi Patrick and Sagar,
> > 
> > Thanks for the feedback! I'll just break out the questions
> > and address them one at a time.
> > 
> > Patrick 1.
> > The default bound that I'm proposing is only to let active
> > tasks answer queries (which is also the default with IQ
> > today). Therefore, calling getPositionBound() would return a
> > PositionBound for which isLatest() is true.
> > 
> > Patrick 2.
> > I might have missed something in revision, but I'm not sure
> > what you're referring to exactly when you say they are
> > different. The IQRequest only has a PositionBound, and the
> > IQResponse only has a (concrete) Position, so I think they
> > are named accordingly (getPositionBound and getPosition). Am
> > I overlooking what you are talking about?
> > 
> > Sagar 1.
> > I think you're talking about the KeyValueStore#get(key)
> > method? This is a really good question. I went ahead and
> > dropped in an addendum to the KeyQuery example to show how
> > you would run the query in today's API. Here's a caracature
> > of the two APIS:
> > 
> > current:
> >   KeyValueStore store = kafkaStreams.store(
> >     "mystore",
> >     keyValueStore())
> >   int value = store.get(key);
> > 
> > proposed:
> >   int value = kafkaStreams.query(
> >     "mystore",
> >     KeyQuery.withKey(key));
> > 
> > So, today we first get the store interface and then we
> > invoke the method, and under the proposal, we would instead
> > just ask KafkaStreams to execute the query on the store. In
> > addition to all the other stuff I said in the motivation,
> > one thing I think is neat about this API is that it means we
> > can re-use queries across stores. So, for example, we could
> > also use KeyQuery on WindowStores, even though there's no
> > common interface between WindowStore and KeyValueStore.
> > 
> > In other words, stores can support any queries that make
> > sense and _not_ support any queries that don't make sense.
> > This gets into your second question...
> > 
> > Sagar 2.
> > Very good question. Your experience with your KIP-614
> > contribution was one of the things that made me want to
> > revise IQ to begin with. It seems like there's a really
> > stark gap between how straightforward the proposal is to add
> > a new store operation, and then how hard it is to actually
> > implement a new operation, due to all those intervening
> > wrappers.
> > 
> > There are two categories of wrappers to worry about:
> > - Facades: These only exist to disallow access to write
> > APIs, which are exposed through IQ today but shouldn't be
> > called. These are simply unnecessary under IQv2, since we
> > only run queries instead of returning the whole store.
> > - Store Layers: This is what you provided examples of. We
> > have store layers that let us compose features like
> > de/serialization and metering, changelogging, caching, etc.
> > A nice thing about this design is that we mostly don't have
> > to worry at all about those wrapper layers at all. Each of
> > these stores would simply delegate any query to lower layers
> > unless there is something they need to do. In my POC, I
> > simply added a delegating implementation to
> > WrappedStateStore, which meant that I didn't need to touch
> > most of the wrappers when I added a new query.
> > 
> > Here's what I think future contributors will have to worry
> > about:
> > 1. The basic query execution in the base byte stores
> > (RocksDB and InMemory)
> > 2. The Caching stores IF they want the query to be served
> > from the cache
> > 3. The Metered stores IF some serialization needs to be done
> > for the query
> > 
> > And that's it! We should be able to add new queries without
> > touching any other store layer besides those, and each one
> > of those is involved because it has some specific reason to
> > be.
> > 
> > 
> > Thanks again, Patrick and Sagar! Please let me know if I
> > failed to address your questions, or if you have any more.
> > 
> > Thanks,
> > -John
> > 
> > On Mon, 2021-11-15 at 22:37 +0530, Sagar wrote:
> > > Hi John,
> > > 
> > > Thanks for the great writeup! Couple of things I wanted to bring up(may
> > or
> > > mayn't be relevant):
> > > 
> > > 1) The sample implementation that you have presented for KeyQuery is very
> > > helpful. One thing which may be added to it is how it connects to the
> > > KeyValue.get(key) method. That's something that atleast I couldn't
> > totally
> > > figure out-not sure about others though. I understand that it is out of
> > > scope of th KIP to explain for every query that IQ supports but one
> > > implementation just to get a sense of how the changes would feel like.
> > > 2) The other thing that I wanted to know is that StateStore on it's own
> > has
> > > a lot of implementations and some of them are wrappers, So at what levels
> > > do users need to implement the query methods? Like a MeteredKeyValueStore
> > > wraps RocksDbStore and calls it internally through a wrapped call. As per
> > > the new changes, how would the scheme of things look like for the same
> > > KeyQuery?
> > > 
> > > Thanks!
> > > Sagar.
> > > 
> > > 
> > > On Mon, Nov 15, 2021 at 6:20 PM Patrick Stuedi
> > <ps...@confluent.io.invalid>
> > > wrote:
> > > 
> > > > Hi John,
> > > > 
> > > > Thanks for submitting the KIP! One question I have is, assuming one
> > > > instantiates InteractiveQueryRequest via withQuery, and then later
> > calls
> > > > getPositionBound, what will the result be? Also I noticed the Position
> > > > returning method is in InteractiveQueryRequest and
> > InteractiveQueryResult
> > > > is named differently, any particular reason?
> > > > 
> > > > Best,
> > > >   Patrick
> > > > 
> > > > 
> > > > On Fri, Nov 12, 2021 at 12:29 AM John Roesler <vv...@apache.org>
> > wrote:
> > > > 
> > > > > Thanks for taking a look, Sophie!
> > > > > 
> > > > > Ah, that was a revision error. I had initially been planning
> > > > > an Optional<Set<Integer>> with Optional.empty() meaning to
> > > > > fetch all partitions, but then decided it was needlessly
> > > > > complex and changed it to the current proposal with two
> > > > > methods:
> > > > > 
> > > > > boolean isAllPartitions();
> > > > > Set<Integer> getPartitions(); (which would throw an
> > > > > exception if it's an "all partitions" request).
> > > > > 
> > > > > I've corrected the javadoc and also documented the
> > > > > exception.
> > > > > 
> > > > > Thanks!
> > > > > -John
> > > > > 
> > > > > On Thu, 2021-11-11 at 15:03 -0800, Sophie Blee-Goldman
> > > > > wrote:
> > > > > > Thanks John, I've been looking forward to this for a while now. It
> > > > > > was pretty horrifying to learn
> > > > > > how present-day IQ works  (or rather, doesn't work) with custom
> > state
> > > > > > stores :/
> > > > > > 
> > > > > > One minor cosmetic point, In the InteractiveQueryRequest class,
> > the #
> > > > > > getPartitions
> > > > > > method has a return type of Set<Integer>, but the javadocs refer to
> > > > > Optional.
> > > > > > Not
> > > > > > sure which is intended for this API, but if is supposed to be the
> > > > return
> > > > > > type, do you perhaps
> > > > > > mean for it to be  Optional.ofEmpty() and Optional.of(non-empty
> > set)
> > > > > > rather than Optional.of(empty set) and Optional.of(non-empty set) ?
> > > > > > 
> > > > > > On Thu, Nov 11, 2021 at 12:03 PM John Roesler <vvcephei@apache.org
> > > 
> > > > > wrote:
> > > > > > 
> > > > > > > Hello again, all,
> > > > > > > 
> > > > > > > Just bumping this discussion on a new, more flexible
> > > > > > > Interactive Query API in Kafka Streams.
> > > > > > > 
> > > > > > > If there are no concerns, I'll go ahead and call a vote on
> > > > > > > Monday.
> > > > > > > 
> > > > > > > Thanks!
> > > > > > > -John
> > > > > > > 
> > > > > > > On Tue, 2021-11-09 at 17:37 -0600, John Roesler wrote:
> > > > > > > > Hello all,
> > > > > > > > 
> > > > > > > > I'd like to start the discussion for KIP-796, which proposes
> > > > > > > > a revamp of the Interactive Query APIs in Kafka Streams.
> > > > > > > > 
> > > > > > > > The proposal is here:
> > > > > > > > https://cwiki.apache.org/confluence/x/34xnCw
> > > > > > > > 
> > > > > > > > I look forward to your feedback!
> > > > > > > > 
> > > > > > > > Thank you,
> > > > > > > > -John
> > > > > > > > 
> > > > > > > 
> > > > > > > 
> > > > > > > 
> > > > > 
> > > > > 
> > > > > 
> > > > 
> > 
> > 


Re: [DISCUSS] KIP-796: Interactive Query v2

Posted by Sagar <sa...@gmail.com>.
Thanks John for answering the 2 questions. Pt #1 makes sense to me now.

Regarding Pt #2, first of all thanks for bringing up KIP-614 :D I did learn
about the interfaces the hard way and probably due to that, the PR really
stretched a lot. Having said that, the point that you mentioned about any
future implementations needing to worry about the base stores, caching and
metered stores, would it make sense to add them explicitly to the KIP and
also to Javadocs if possible? That would guide the future contributors.
WDYT?

The other question I have is (may be irrelevant) but with these changes, is
there going to be any impact on remote state store querying capabilities?

Thanks!
Sagar.

On Tue, Nov 16, 2021 at 4:22 AM John Roesler <vv...@apache.org> wrote:

> Hi Patrick and Sagar,
>
> Thanks for the feedback! I'll just break out the questions
> and address them one at a time.
>
> Patrick 1.
> The default bound that I'm proposing is only to let active
> tasks answer queries (which is also the default with IQ
> today). Therefore, calling getPositionBound() would return a
> PositionBound for which isLatest() is true.
>
> Patrick 2.
> I might have missed something in revision, but I'm not sure
> what you're referring to exactly when you say they are
> different. The IQRequest only has a PositionBound, and the
> IQResponse only has a (concrete) Position, so I think they
> are named accordingly (getPositionBound and getPosition). Am
> I overlooking what you are talking about?
>
> Sagar 1.
> I think you're talking about the KeyValueStore#get(key)
> method? This is a really good question. I went ahead and
> dropped in an addendum to the KeyQuery example to show how
> you would run the query in today's API. Here's a caracature
> of the two APIS:
>
> current:
>   KeyValueStore store = kafkaStreams.store(
>     "mystore",
>     keyValueStore())
>   int value = store.get(key);
>
> proposed:
>   int value = kafkaStreams.query(
>     "mystore",
>     KeyQuery.withKey(key));
>
> So, today we first get the store interface and then we
> invoke the method, and under the proposal, we would instead
> just ask KafkaStreams to execute the query on the store. In
> addition to all the other stuff I said in the motivation,
> one thing I think is neat about this API is that it means we
> can re-use queries across stores. So, for example, we could
> also use KeyQuery on WindowStores, even though there's no
> common interface between WindowStore and KeyValueStore.
>
> In other words, stores can support any queries that make
> sense and _not_ support any queries that don't make sense.
> This gets into your second question...
>
> Sagar 2.
> Very good question. Your experience with your KIP-614
> contribution was one of the things that made me want to
> revise IQ to begin with. It seems like there's a really
> stark gap between how straightforward the proposal is to add
> a new store operation, and then how hard it is to actually
> implement a new operation, due to all those intervening
> wrappers.
>
> There are two categories of wrappers to worry about:
> - Facades: These only exist to disallow access to write
> APIs, which are exposed through IQ today but shouldn't be
> called. These are simply unnecessary under IQv2, since we
> only run queries instead of returning the whole store.
> - Store Layers: This is what you provided examples of. We
> have store layers that let us compose features like
> de/serialization and metering, changelogging, caching, etc.
> A nice thing about this design is that we mostly don't have
> to worry at all about those wrapper layers at all. Each of
> these stores would simply delegate any query to lower layers
> unless there is something they need to do. In my POC, I
> simply added a delegating implementation to
> WrappedStateStore, which meant that I didn't need to touch
> most of the wrappers when I added a new query.
>
> Here's what I think future contributors will have to worry
> about:
> 1. The basic query execution in the base byte stores
> (RocksDB and InMemory)
> 2. The Caching stores IF they want the query to be served
> from the cache
> 3. The Metered stores IF some serialization needs to be done
> for the query
>
> And that's it! We should be able to add new queries without
> touching any other store layer besides those, and each one
> of those is involved because it has some specific reason to
> be.
>
>
> Thanks again, Patrick and Sagar! Please let me know if I
> failed to address your questions, or if you have any more.
>
> Thanks,
> -John
>
> On Mon, 2021-11-15 at 22:37 +0530, Sagar wrote:
> > Hi John,
> >
> > Thanks for the great writeup! Couple of things I wanted to bring up(may
> or
> > mayn't be relevant):
> >
> > 1) The sample implementation that you have presented for KeyQuery is very
> > helpful. One thing which may be added to it is how it connects to the
> > KeyValue.get(key) method. That's something that atleast I couldn't
> totally
> > figure out-not sure about others though. I understand that it is out of
> > scope of th KIP to explain for every query that IQ supports but one
> > implementation just to get a sense of how the changes would feel like.
> > 2) The other thing that I wanted to know is that StateStore on it's own
> has
> > a lot of implementations and some of them are wrappers, So at what levels
> > do users need to implement the query methods? Like a MeteredKeyValueStore
> > wraps RocksDbStore and calls it internally through a wrapped call. As per
> > the new changes, how would the scheme of things look like for the same
> > KeyQuery?
> >
> > Thanks!
> > Sagar.
> >
> >
> > On Mon, Nov 15, 2021 at 6:20 PM Patrick Stuedi
> <ps...@confluent.io.invalid>
> > wrote:
> >
> > > Hi John,
> > >
> > > Thanks for submitting the KIP! One question I have is, assuming one
> > > instantiates InteractiveQueryRequest via withQuery, and then later
> calls
> > > getPositionBound, what will the result be? Also I noticed the Position
> > > returning method is in InteractiveQueryRequest and
> InteractiveQueryResult
> > > is named differently, any particular reason?
> > >
> > > Best,
> > >   Patrick
> > >
> > >
> > > On Fri, Nov 12, 2021 at 12:29 AM John Roesler <vv...@apache.org>
> wrote:
> > >
> > > > Thanks for taking a look, Sophie!
> > > >
> > > > Ah, that was a revision error. I had initially been planning
> > > > an Optional<Set<Integer>> with Optional.empty() meaning to
> > > > fetch all partitions, but then decided it was needlessly
> > > > complex and changed it to the current proposal with two
> > > > methods:
> > > >
> > > > boolean isAllPartitions();
> > > > Set<Integer> getPartitions(); (which would throw an
> > > > exception if it's an "all partitions" request).
> > > >
> > > > I've corrected the javadoc and also documented the
> > > > exception.
> > > >
> > > > Thanks!
> > > > -John
> > > >
> > > > On Thu, 2021-11-11 at 15:03 -0800, Sophie Blee-Goldman
> > > > wrote:
> > > > > Thanks John, I've been looking forward to this for a while now. It
> > > > > was pretty horrifying to learn
> > > > > how present-day IQ works  (or rather, doesn't work) with custom
> state
> > > > > stores :/
> > > > >
> > > > > One minor cosmetic point, In the InteractiveQueryRequest class,
> the #
> > > > > getPartitions
> > > > > method has a return type of Set<Integer>, but the javadocs refer to
> > > > Optional.
> > > > > Not
> > > > > sure which is intended for this API, but if is supposed to be the
> > > return
> > > > > type, do you perhaps
> > > > > mean for it to be  Optional.ofEmpty() and Optional.of(non-empty
> set)
> > > > > rather than Optional.of(empty set) and Optional.of(non-empty set) ?
> > > > >
> > > > > On Thu, Nov 11, 2021 at 12:03 PM John Roesler <vvcephei@apache.org
> >
> > > > wrote:
> > > > >
> > > > > > Hello again, all,
> > > > > >
> > > > > > Just bumping this discussion on a new, more flexible
> > > > > > Interactive Query API in Kafka Streams.
> > > > > >
> > > > > > If there are no concerns, I'll go ahead and call a vote on
> > > > > > Monday.
> > > > > >
> > > > > > Thanks!
> > > > > > -John
> > > > > >
> > > > > > On Tue, 2021-11-09 at 17:37 -0600, John Roesler wrote:
> > > > > > > Hello all,
> > > > > > >
> > > > > > > I'd like to start the discussion for KIP-796, which proposes
> > > > > > > a revamp of the Interactive Query APIs in Kafka Streams.
> > > > > > >
> > > > > > > The proposal is here:
> > > > > > > https://cwiki.apache.org/confluence/x/34xnCw
> > > > > > >
> > > > > > > I look forward to your feedback!
> > > > > > >
> > > > > > > Thank you,
> > > > > > > -John
> > > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > >
> > > >
> > > >
> > >
>
>

Re: [DISCUSS] KIP-796: Interactive Query v2

Posted by John Roesler <vv...@apache.org>.
Hi Patrick and Sagar,

Thanks for the feedback! I'll just break out the questions
and address them one at a time.

Patrick 1.
The default bound that I'm proposing is only to let active
tasks answer queries (which is also the default with IQ
today). Therefore, calling getPositionBound() would return a
PositionBound for which isLatest() is true.

Patrick 2.
I might have missed something in revision, but I'm not sure
what you're referring to exactly when you say they are
different. The IQRequest only has a PositionBound, and the
IQResponse only has a (concrete) Position, so I think they
are named accordingly (getPositionBound and getPosition). Am
I overlooking what you are talking about?

Sagar 1.
I think you're talking about the KeyValueStore#get(key)
method? This is a really good question. I went ahead and
dropped in an addendum to the KeyQuery example to show how
you would run the query in today's API. Here's a caracature
of the two APIS:

current:
  KeyValueStore store = kafkaStreams.store(
    "mystore",
    keyValueStore())
  int value = store.get(key);

proposed:
  int value = kafkaStreams.query(
    "mystore",
    KeyQuery.withKey(key));

So, today we first get the store interface and then we
invoke the method, and under the proposal, we would instead
just ask KafkaStreams to execute the query on the store. In
addition to all the other stuff I said in the motivation,
one thing I think is neat about this API is that it means we
can re-use queries across stores. So, for example, we could
also use KeyQuery on WindowStores, even though there's no
common interface between WindowStore and KeyValueStore.

In other words, stores can support any queries that make
sense and _not_ support any queries that don't make sense.
This gets into your second question...

Sagar 2.
Very good question. Your experience with your KIP-614
contribution was one of the things that made me want to
revise IQ to begin with. It seems like there's a really
stark gap between how straightforward the proposal is to add
a new store operation, and then how hard it is to actually
implement a new operation, due to all those intervening
wrappers.

There are two categories of wrappers to worry about:
- Facades: These only exist to disallow access to write
APIs, which are exposed through IQ today but shouldn't be
called. These are simply unnecessary under IQv2, since we
only run queries instead of returning the whole store.
- Store Layers: This is what you provided examples of. We
have store layers that let us compose features like
de/serialization and metering, changelogging, caching, etc.
A nice thing about this design is that we mostly don't have
to worry at all about those wrapper layers at all. Each of
these stores would simply delegate any query to lower layers
unless there is something they need to do. In my POC, I
simply added a delegating implementation to
WrappedStateStore, which meant that I didn't need to touch
most of the wrappers when I added a new query.

Here's what I think future contributors will have to worry
about:
1. The basic query execution in the base byte stores
(RocksDB and InMemory)
2. The Caching stores IF they want the query to be served
from the cache
3. The Metered stores IF some serialization needs to be done
for the query

And that's it! We should be able to add new queries without
touching any other store layer besides those, and each one
of those is involved because it has some specific reason to
be.


Thanks again, Patrick and Sagar! Please let me know if I
failed to address your questions, or if you have any more.

Thanks,
-John

On Mon, 2021-11-15 at 22:37 +0530, Sagar wrote:
> Hi John,
> 
> Thanks for the great writeup! Couple of things I wanted to bring up(may or
> mayn't be relevant):
> 
> 1) The sample implementation that you have presented for KeyQuery is very
> helpful. One thing which may be added to it is how it connects to the
> KeyValue.get(key) method. That's something that atleast I couldn't totally
> figure out-not sure about others though. I understand that it is out of
> scope of th KIP to explain for every query that IQ supports but one
> implementation just to get a sense of how the changes would feel like.
> 2) The other thing that I wanted to know is that StateStore on it's own has
> a lot of implementations and some of them are wrappers, So at what levels
> do users need to implement the query methods? Like a MeteredKeyValueStore
> wraps RocksDbStore and calls it internally through a wrapped call. As per
> the new changes, how would the scheme of things look like for the same
> KeyQuery?
> 
> Thanks!
> Sagar.
> 
> 
> On Mon, Nov 15, 2021 at 6:20 PM Patrick Stuedi <ps...@confluent.io.invalid>
> wrote:
> 
> > Hi John,
> > 
> > Thanks for submitting the KIP! One question I have is, assuming one
> > instantiates InteractiveQueryRequest via withQuery, and then later calls
> > getPositionBound, what will the result be? Also I noticed the Position
> > returning method is in InteractiveQueryRequest and InteractiveQueryResult
> > is named differently, any particular reason?
> > 
> > Best,
> >   Patrick
> > 
> > 
> > On Fri, Nov 12, 2021 at 12:29 AM John Roesler <vv...@apache.org> wrote:
> > 
> > > Thanks for taking a look, Sophie!
> > > 
> > > Ah, that was a revision error. I had initially been planning
> > > an Optional<Set<Integer>> with Optional.empty() meaning to
> > > fetch all partitions, but then decided it was needlessly
> > > complex and changed it to the current proposal with two
> > > methods:
> > > 
> > > boolean isAllPartitions();
> > > Set<Integer> getPartitions(); (which would throw an
> > > exception if it's an "all partitions" request).
> > > 
> > > I've corrected the javadoc and also documented the
> > > exception.
> > > 
> > > Thanks!
> > > -John
> > > 
> > > On Thu, 2021-11-11 at 15:03 -0800, Sophie Blee-Goldman
> > > wrote:
> > > > Thanks John, I've been looking forward to this for a while now. It
> > > > was pretty horrifying to learn
> > > > how present-day IQ works  (or rather, doesn't work) with custom state
> > > > stores :/
> > > > 
> > > > One minor cosmetic point, In the InteractiveQueryRequest class, the #
> > > > getPartitions
> > > > method has a return type of Set<Integer>, but the javadocs refer to
> > > Optional.
> > > > Not
> > > > sure which is intended for this API, but if is supposed to be the
> > return
> > > > type, do you perhaps
> > > > mean for it to be  Optional.ofEmpty() and Optional.of(non-empty set)
> > > > rather than Optional.of(empty set) and Optional.of(non-empty set) ?
> > > > 
> > > > On Thu, Nov 11, 2021 at 12:03 PM John Roesler <vv...@apache.org>
> > > wrote:
> > > > 
> > > > > Hello again, all,
> > > > > 
> > > > > Just bumping this discussion on a new, more flexible
> > > > > Interactive Query API in Kafka Streams.
> > > > > 
> > > > > If there are no concerns, I'll go ahead and call a vote on
> > > > > Monday.
> > > > > 
> > > > > Thanks!
> > > > > -John
> > > > > 
> > > > > On Tue, 2021-11-09 at 17:37 -0600, John Roesler wrote:
> > > > > > Hello all,
> > > > > > 
> > > > > > I'd like to start the discussion for KIP-796, which proposes
> > > > > > a revamp of the Interactive Query APIs in Kafka Streams.
> > > > > > 
> > > > > > The proposal is here:
> > > > > > https://cwiki.apache.org/confluence/x/34xnCw
> > > > > > 
> > > > > > I look forward to your feedback!
> > > > > > 
> > > > > > Thank you,
> > > > > > -John
> > > > > > 
> > > > > 
> > > > > 
> > > > > 
> > > 
> > > 
> > > 
> > 


Re: [DISCUSS] KIP-796: Interactive Query v2

Posted by Sagar <sa...@gmail.com>.
Hi John,

Thanks for the great writeup! Couple of things I wanted to bring up(may or
mayn't be relevant):

1) The sample implementation that you have presented for KeyQuery is very
helpful. One thing which may be added to it is how it connects to the
KeyValue.get(key) method. That's something that atleast I couldn't totally
figure out-not sure about others though. I understand that it is out of
scope of th KIP to explain for every query that IQ supports but one
implementation just to get a sense of how the changes would feel like.
2) The other thing that I wanted to know is that StateStore on it's own has
a lot of implementations and some of them are wrappers, So at what levels
do users need to implement the query methods? Like a MeteredKeyValueStore
wraps RocksDbStore and calls it internally through a wrapped call. As per
the new changes, how would the scheme of things look like for the same
KeyQuery?

Thanks!
Sagar.


On Mon, Nov 15, 2021 at 6:20 PM Patrick Stuedi <ps...@confluent.io.invalid>
wrote:

> Hi John,
>
> Thanks for submitting the KIP! One question I have is, assuming one
> instantiates InteractiveQueryRequest via withQuery, and then later calls
> getPositionBound, what will the result be? Also I noticed the Position
> returning method is in InteractiveQueryRequest and InteractiveQueryResult
> is named differently, any particular reason?
>
> Best,
>   Patrick
>
>
> On Fri, Nov 12, 2021 at 12:29 AM John Roesler <vv...@apache.org> wrote:
>
> > Thanks for taking a look, Sophie!
> >
> > Ah, that was a revision error. I had initially been planning
> > an Optional<Set<Integer>> with Optional.empty() meaning to
> > fetch all partitions, but then decided it was needlessly
> > complex and changed it to the current proposal with two
> > methods:
> >
> > boolean isAllPartitions();
> > Set<Integer> getPartitions(); (which would throw an
> > exception if it's an "all partitions" request).
> >
> > I've corrected the javadoc and also documented the
> > exception.
> >
> > Thanks!
> > -John
> >
> > On Thu, 2021-11-11 at 15:03 -0800, Sophie Blee-Goldman
> > wrote:
> > > Thanks John, I've been looking forward to this for a while now. It
> > > was pretty horrifying to learn
> > > how present-day IQ works  (or rather, doesn't work) with custom state
> > > stores :/
> > >
> > > One minor cosmetic point, In the InteractiveQueryRequest class, the #
> > > getPartitions
> > > method has a return type of Set<Integer>, but the javadocs refer to
> > Optional.
> > > Not
> > > sure which is intended for this API, but if is supposed to be the
> return
> > > type, do you perhaps
> > > mean for it to be  Optional.ofEmpty() and Optional.of(non-empty set)
> > > rather than Optional.of(empty set) and Optional.of(non-empty set) ?
> > >
> > > On Thu, Nov 11, 2021 at 12:03 PM John Roesler <vv...@apache.org>
> > wrote:
> > >
> > > > Hello again, all,
> > > >
> > > > Just bumping this discussion on a new, more flexible
> > > > Interactive Query API in Kafka Streams.
> > > >
> > > > If there are no concerns, I'll go ahead and call a vote on
> > > > Monday.
> > > >
> > > > Thanks!
> > > > -John
> > > >
> > > > On Tue, 2021-11-09 at 17:37 -0600, John Roesler wrote:
> > > > > Hello all,
> > > > >
> > > > > I'd like to start the discussion for KIP-796, which proposes
> > > > > a revamp of the Interactive Query APIs in Kafka Streams.
> > > > >
> > > > > The proposal is here:
> > > > > https://cwiki.apache.org/confluence/x/34xnCw
> > > > >
> > > > > I look forward to your feedback!
> > > > >
> > > > > Thank you,
> > > > > -John
> > > > >
> > > >
> > > >
> > > >
> >
> >
> >
>

Re: [DISCUSS] KIP-796: Interactive Query v2

Posted by Patrick Stuedi <ps...@confluent.io.INVALID>.
Hi John,

Thanks for submitting the KIP! One question I have is, assuming one
instantiates InteractiveQueryRequest via withQuery, and then later calls
getPositionBound, what will the result be? Also I noticed the Position
returning method is in InteractiveQueryRequest and InteractiveQueryResult
is named differently, any particular reason?

Best,
  Patrick


On Fri, Nov 12, 2021 at 12:29 AM John Roesler <vv...@apache.org> wrote:

> Thanks for taking a look, Sophie!
>
> Ah, that was a revision error. I had initially been planning
> an Optional<Set<Integer>> with Optional.empty() meaning to
> fetch all partitions, but then decided it was needlessly
> complex and changed it to the current proposal with two
> methods:
>
> boolean isAllPartitions();
> Set<Integer> getPartitions(); (which would throw an
> exception if it's an "all partitions" request).
>
> I've corrected the javadoc and also documented the
> exception.
>
> Thanks!
> -John
>
> On Thu, 2021-11-11 at 15:03 -0800, Sophie Blee-Goldman
> wrote:
> > Thanks John, I've been looking forward to this for a while now. It
> > was pretty horrifying to learn
> > how present-day IQ works  (or rather, doesn't work) with custom state
> > stores :/
> >
> > One minor cosmetic point, In the InteractiveQueryRequest class, the #
> > getPartitions
> > method has a return type of Set<Integer>, but the javadocs refer to
> Optional.
> > Not
> > sure which is intended for this API, but if is supposed to be the return
> > type, do you perhaps
> > mean for it to be  Optional.ofEmpty() and Optional.of(non-empty set)
> > rather than Optional.of(empty set) and Optional.of(non-empty set) ?
> >
> > On Thu, Nov 11, 2021 at 12:03 PM John Roesler <vv...@apache.org>
> wrote:
> >
> > > Hello again, all,
> > >
> > > Just bumping this discussion on a new, more flexible
> > > Interactive Query API in Kafka Streams.
> > >
> > > If there are no concerns, I'll go ahead and call a vote on
> > > Monday.
> > >
> > > Thanks!
> > > -John
> > >
> > > On Tue, 2021-11-09 at 17:37 -0600, John Roesler wrote:
> > > > Hello all,
> > > >
> > > > I'd like to start the discussion for KIP-796, which proposes
> > > > a revamp of the Interactive Query APIs in Kafka Streams.
> > > >
> > > > The proposal is here:
> > > > https://cwiki.apache.org/confluence/x/34xnCw
> > > >
> > > > I look forward to your feedback!
> > > >
> > > > Thank you,
> > > > -John
> > > >
> > >
> > >
> > >
>
>
>

Re: [DISCUSS] KIP-796: Interactive Query v2

Posted by John Roesler <vv...@apache.org>.
Thanks for taking a look, Sophie!

Ah, that was a revision error. I had initially been planning
an Optional<Set<Integer>> with Optional.empty() meaning to
fetch all partitions, but then decided it was needlessly
complex and changed it to the current proposal with two
methods:

boolean isAllPartitions();
Set<Integer> getPartitions(); (which would throw an
exception if it's an "all partitions" request).

I've corrected the javadoc and also documented the
exception.

Thanks!
-John

On Thu, 2021-11-11 at 15:03 -0800, Sophie Blee-Goldman
wrote:
> Thanks John, I've been looking forward to this for a while now. It
> was pretty horrifying to learn
> how present-day IQ works  (or rather, doesn't work) with custom state
> stores :/
> 
> One minor cosmetic point, In the InteractiveQueryRequest class, the #
> getPartitions
> method has a return type of Set<Integer>, but the javadocs refer to Optional.
> Not
> sure which is intended for this API, but if is supposed to be the return
> type, do you perhaps
> mean for it to be  Optional.ofEmpty() and Optional.of(non-empty set)
> rather than Optional.of(empty set) and Optional.of(non-empty set) ?
> 
> On Thu, Nov 11, 2021 at 12:03 PM John Roesler <vv...@apache.org> wrote:
> 
> > Hello again, all,
> > 
> > Just bumping this discussion on a new, more flexible
> > Interactive Query API in Kafka Streams.
> > 
> > If there are no concerns, I'll go ahead and call a vote on
> > Monday.
> > 
> > Thanks!
> > -John
> > 
> > On Tue, 2021-11-09 at 17:37 -0600, John Roesler wrote:
> > > Hello all,
> > > 
> > > I'd like to start the discussion for KIP-796, which proposes
> > > a revamp of the Interactive Query APIs in Kafka Streams.
> > > 
> > > The proposal is here:
> > > https://cwiki.apache.org/confluence/x/34xnCw
> > > 
> > > I look forward to your feedback!
> > > 
> > > Thank you,
> > > -John
> > > 
> > 
> > 
> > 



Re: [DISCUSS] KIP-796: Interactive Query v2

Posted by Sophie Blee-Goldman <so...@confluent.io.INVALID>.
Thanks John, I've been looking forward to this for a while now. It
was pretty horrifying to learn
how present-day IQ works  (or rather, doesn't work) with custom state
stores :/

One minor cosmetic point, In the InteractiveQueryRequest class, the #
getPartitions
method has a return type of Set<Integer>, but the javadocs refer to Optional.
Not
sure which is intended for this API, but if is supposed to be the return
type, do you perhaps
mean for it to be  Optional.ofEmpty() and Optional.of(non-empty set)
rather than Optional.of(empty set) and Optional.of(non-empty set) ?

On Thu, Nov 11, 2021 at 12:03 PM John Roesler <vv...@apache.org> wrote:

> Hello again, all,
>
> Just bumping this discussion on a new, more flexible
> Interactive Query API in Kafka Streams.
>
> If there are no concerns, I'll go ahead and call a vote on
> Monday.
>
> Thanks!
> -John
>
> On Tue, 2021-11-09 at 17:37 -0600, John Roesler wrote:
> > Hello all,
> >
> > I'd like to start the discussion for KIP-796, which proposes
> > a revamp of the Interactive Query APIs in Kafka Streams.
> >
> > The proposal is here:
> > https://cwiki.apache.org/confluence/x/34xnCw
> >
> > I look forward to your feedback!
> >
> > Thank you,
> > -John
> >
>
>
>

Re: [DISCUSS] KIP-796: Interactive Query v2

Posted by John Roesler <vv...@apache.org>.
Hello again, all,

Just bumping this discussion on a new, more flexible
Interactive Query API in Kafka Streams.

If there are no concerns, I'll go ahead and call a vote on
Monday.

Thanks!
-John

On Tue, 2021-11-09 at 17:37 -0600, John Roesler wrote:
> Hello all,
> 
> I'd like to start the discussion for KIP-796, which proposes
> a revamp of the Interactive Query APIs in Kafka Streams.
> 
> The proposal is here:
> https://cwiki.apache.org/confluence/x/34xnCw
> 
> I look forward to your feedback!
> 
> Thank you,
> -John
>