You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Jan Filipiak <Ja...@trivago.com> on 2017/08/02 00:40:26 UTC

Re: [DISCUSS] Streams DSL/StateStore Refactoring

Hi all,

after some further discussions, the best thing to show my Idea of how it 
should evolve would be a bigger mock/interface description.
The goal is to reduce the store maintaining processors to only the 
Aggregators + and KTableSource. While having KTableSource optionally 
materialized.

Introducing KTable:copy() will allow users to maintain state twice if 
they really want to. KStream::join*() wasn't touched. I never personally 
used that so I didn't feel
comfortable enough touching it. Currently still making up my mind. None 
of the suggestions made it querieable so far. Gouzhangs 'Buffered' idea 
seems ideal here.

please have a look. Looking forward for your opinions.

Best Jan



On 21.06.2017 17:24, Eno Thereska wrote:
> (cc’ing user-list too)
>
> Given that we already have StateStoreSuppliers that are configurable using the fluent-like API, probably it’s worth discussing the other examples with joins and serdes first since those have many overloads and are in need of some TLC.
>
> So following your example, I guess you’d have something like:
> .join()
>     .withKeySerdes(…)
>     .withValueSerdes(…)
>     .withJoinType(“outer”)
>
> etc?
>
> I like the approach since it still remains declarative and it’d reduce the number of overloads by quite a bit.
>
> Eno
>
>> On Jun 21, 2017, at 3:37 PM, Damian Guy <da...@gmail.com> wrote:
>>
>> Hi,
>>
>> I'd like to get a discussion going around some of the API choices we've
>> made in the DLS. In particular those that relate to stateful operations
>> (though this could expand).
>> As it stands we lean heavily on overloaded methods in the API, i.e, there
>> are 9 overloads for KGroupedStream.count(..)! It is becoming noisy and i
>> feel it is only going to get worse as we add more optional params. In
>> particular we've had some requests to be able to turn caching off, or
>> change log configs,  on a per operator basis (note this can be done now if
>> you pass in a StateStoreSupplier, but this can be a bit cumbersome).
>>
>> So this is a bit of an open question. How can we change the DSL overloads
>> so that it flows, is simple to use and understand, and is easily extended
>> in the future?
>>
>> One option would be to use a fluent API approach for providing the optional
>> params, so something like this:
>>
>> groupedStream.count()
>>    .withStoreName("name")
>>    .withCachingEnabled(false)
>>    .withLoggingEnabled(config)
>>    .table()
>>
>>
>>
>> Another option would be to provide a Builder to the count method, so it
>> would look something like this:
>> groupedStream.count(new
>> CountBuilder("storeName").withCachingEnabled(false).build())
>>
>> Another option is to say: Hey we don't need this, what are you on about!
>>
>> The above has focussed on state store related overloads, but the same ideas
>> could  be applied to joins etc, where we presently have many join methods
>> and many overloads.
>>
>> Anyway, i look forward to hearing your opinions.
>>
>> Thanks,
>> Damian


Re: [DISCUSS] Streams DSL/StateStore Refactoring

Posted by Jan Filipiak <Ja...@trivago.com>.
Inline

rather sparse for the lack of time.

Sadly I can't agree to any of your arguments and I _hate_ how its gonna 
look,
but we can't have this discussion for ever.

I think I explained everything in enough detail so my points can make sense.
if someone is interested and has specific questions, can always approach me.

Otherwise I am just going to drink the kool-aid now. :(

Best Jan

On 08.08.2017 20:37, Guozhang Wang wrote:
> Hello Jan,
>
> Thanks for your feedback. Trying to explain them a bit more here since I
> think there are still a bit mis-communication here:
>
> Here are a few things I need to clarify for KIP-182 first:
>
> 1. KIP-182 is mainly about refactoring the public APIs, NOT for making any
> optimizations on the internal implementations. So we only care that these
> public APIs changes do not forbid us to make the internal implementations
> in the near future.
>
> To give you a concrete example, as you mentioned that KTableValueGetterSupplier
> is NOT used in IQ, and that a materialized physical store is always used
> today. Yes that is true, and we do have plans to optimize this case soon;
> for example, it is still doable with the proposed KIP-182 that we can
> remove the physical materialized store but use KTableValueGetterSupplier to
> read form a up-stream's physical store and apply the optimizations. Another
> example you mentioned is stream-stream join, where each stream is
> physically materialized into a store, we can definitely optimize this in
> the future to remove the physical materialized store but use something
> else, e.g. a in-memory buffer. Such optimizations are NOT blocked by the
> updated public APIs of KIP-182.
One of the big goals of the refactoring at least was to get rid of the 
overloads
to make implementation of new features easier as one has not to take 
care about
all the overloads. Folding 2 Overloads into 1 with a Builder that has 2 
way of beeing build
wont help much here.

Having the DSL express very closely what happens will only help people 
not getting confused.
Having the store overload on every operation is just plain confusing 
right now.

>
>
> 2. One concern you raise that KIP-182 may actually block such
> optimizations, is that if users do specify a StateStoreSupplier then we
> cannot optimize that away. That is true, but that is by design: if user do
> specify a state store supplier in Materialized API, that is equal to say
> "forget about doing any smart things library, just use what I give to you".
> In other words, the above mentioned optimizations can be applied if users
> do not enforce any specific StateStoreSupplier, for example in
>
> public static <K, V, S extends StateStore> Materialized<K, V, S>
> as(final String
> storeName)
>
> i.e. user only provide a store name, which is similar like handler token
> for IQ; then the library still have the freedom to do smart things
> internally which is totally hidden from the users. It is similar to, like
> in RDBMS or some NoSQL stores like in HIVE / Cassandra: the store engine do
> not have the freedom to do those query plan optimizations if users already
> enforce the specs like join ordering, query plan generation rules, etc.
You call the same method with the builder build differently and its 
going todo
different things. That is my definition of unituitive + The code 
internally has to become
dead ugly as it needs to apply these optimisations basically in the same 
method call or
at the place the Builder is evaluated. This just cries for ugly internal 
code. There is no
way this can become pretty

>
>
> 3. About whether it is worthwhile to "not break the fluent interface", a
> key point that we cannot just try to optimize one or two use cases, but
> consider the whole user space, and ask what are the percentage of users
> that may get affected. Note that in the DSL we have overloaded functions
> where Materialized / Joined / other options are NOT needed so for most
> normal users they do not need to worry about the specs at all.
>
> So suppose there are only X% "advanced" users who would want to enforce
> some state store suppliers, and Y% who like to use IQ, 100-X-Y percent of
> normal users see no difference in terms of programming for either of these
> two approaches: whether to separate the specs into a different set of APIs.
> And for the Y percent of users they are most likely to just use the
> simplest APIs which is `operator(..., Materialized.as(storeName))` which
> does not sound too bad as to `table = operator(...);
> table.materialize(storeName)`. In other words we use the first approach
> then only X percent of users may have an awkward programming with complex
> option specs along with the operator; if we use the second approach the X+Y
> users need to break its programing fluency to call `table.materialize`
> separately. And my personal guess is that
>
> 0 < X << Y < 1, and that X is very minor compared to Y. That is why I feel
> this is a good trade-off.
>
The keypoint here is that It doesn't matter. Any sufficiently usefull 
topology
will get broken up by the user anyways to keep his code sane. And it 
will most
likely be broken up in the interesting parts where IQ would be usefull. 
The math of
how many people are affected by this is therefore not important. 
Additionally
the comparission doesnt make sense as the X-group can still go with a 
fluent
interface only the Y-people need to break their fluent interface


> Guozhang
>
>
> On Fri, Aug 4, 2017 at 6:18 AM, Jan Filipiak <Ja...@trivago.com>
> wrote:
>
>> Hi Guozhang,
>>
>>   thank you very much for the reply. It explained a lot more of your
>> reasoning to me
>> once again!
>>
>> I have to disagree with you on the first point. As you mentioned the Join
>> Case.
>> A Join is usually a "logically" materialized table and its
>> KTableValueGetterSupplier
>> is to be used when one wants todo a lookup. But this is not at all what is
>> currently
>> Happening. The join merge processor currently maintains its own new
>> statestore
>> when join is invoked with Storename or supplier.
>>
>> This describes the Issue I want to address perfectly. A Joined-Table
>> doesn't become
>> querieable because it is a JOINEDtable but because it is a joinedTABLE.
>> the emphasis here
>> is that we put the store logic with the join and not the table. It is part
>> of the join() method invocation and not the KTable Interface. This
>> abstraction is wrong.
>>
>> This will always show its ugly face. Lets check your example:
>>
>> stream.groupByKey(..).aggregate(.., Materializedas("store1"))        //
>> this resulted KTable is materialized in order to complete the aggregation
>> operation
>>    .filter(Materialized.as("store2"))
>>                 // this restuled KTable is not materialized but its
>> GetterSupplier is implemented to get values from "store1"
>>
>> Currently this is only half true. For IQ a store is used that is maintained
>> by the KTableFilterProcessor, for downstream gets like joins the
>> ValueGetterSupplier is used
>> and indeed uses store1.
>>
>> With the String overload (that you picked here on purpose I guess) it
>> works easier
>> as you can logically map those. But with the StateStoreSupplier it
>> wouldn't.
>> you could not optimize this away as the user is expecting puts and gets to
>> be called
>> on what he supplied.
>>
>> table1.filter(() -> true, InMemoryStore).filter(()->true,SQlLiteStore)
>>
>> There is no way to optimize these away.
>> The same argument with the join holds for filter. Its not querrieable
>> because it got filtered
>> it is querrieable because its a KTable. That's where the emphasis needs to
>> go.
>>
>> The second point was new to me. So I had to think about this in more
>> detail.
>> For me the breaking of the flow comes in very natural.
>>
>> One Stream app I put most of my heart in has the these key metrics:
>> It has:
>> 8   input topics.
>> 3   1:n Joins
>> 6   Group bys
>> 2   built in Joins
>> 2   built in left joins
>> some filters and mappers.
>>
>> this is spanning 390 lines, counting java imports and some more stuff.
>>
>> The whole topology forms a tree in wich the input topics usually get
>> joined and then collected to maps
>> and then joined again and collected to maps again. until they get send to
>> 1 final output topic for consumption in our application servers.
>>
>> I would argue it is impossible to express this topology as a chain of
>> calls. What happened is that
>> usually each join + groupBy tuple became its method taking in the builder
>> and return the Table
>> expressing the result of the sub topology. All Ktables that meet each
>> other with the same key in the
>> process get joined (most of that happening on the top level). This leads
>> to breaking in the fluent interface
>> quite naturally. especially if you have 2 KTables expressing
>> sub-topologies joined together. One subtopology had to go into the method
>> call which is unreasonable IMHO.
>>
>> Even inside these methods we broke the chains. The variable names we used
>> give intermediate KTables really helped in making the semantics clear. They
>> are much like CTE's in hive or the required name in Mysql Subquerries. They
>> help to mark milestones inside the topology.
>>
>> I would argue that for big topologies. (I haven't seen others but I think
>> its big) these milestones would
>> be the most important ones for IQ aswell. So i would argue breaking the
>> chains is not really a problem in
>> reality and it can help in many cases. As I laid out, we broke our chained
>> calls intuitively and it helped
>> other developers debugging the logic a lot. Even without detailed streams
>> understanding.
>>
>> If one really do not want to stop the flow. I could argue that one could
>> either do something like this
>>
>> KTable joinresult;
>> KTable<Integer,Integer> t1 = b.table("laa");
>> KTable<Integer,Integer> t2 = b.table("luu");
>> (joinresult = t1.join(t2, (value1, value2) -> value1 + value2))
>> .filter((key, value) -> false);
>>
>> or write a little snitch like that
>>
>> KTable<Integer,Integer> rememberTableandContinue(KTable<Integer,Integer>
>> t){
>>                  joinresult = t;
>>                  return t;
>> }
>>
>> for usuage as such
>>
>> rememberTableandContinue(t1.join(t2, (value1, value2) -> value1 + value2))
>>                          .filter((key, value) -> false);
>>
>> These suggestions might not looks so pretty. But in the context of
>> breaking bigger topology at milestones.
>> I think everything becomes acceptable really. Probably user would store
>> that intermediate  KTable anyways just for clarity.
>>
>> To summarize to give a KTable a name. I would always opt to the host
>> language variable names.
>> Tables used for IQ are probably tables that are of some sort more
>> important to the topology than
>> others and saving them separatly will increase the readability of
>> topologies by a lot IMO.
>>
>> For these quick example Topologies that we have floating around in all
>> places:
>> I am pretty sure one can go unbroken on them and usually the last table
>> will be the one that
>> is needed for IQ then.
>>
>>
>> Thanks again. The second point really got me thinking, as your perspective
>> on the importance
>> of "not break the fluent interface" was not clear to me. I hope I managed
>> to line out why I
>> think it shouldn't have such a big weight in the discussion.
>>
>> PS.: check out Hive CTE, everyone loves them and our Analytic team is
>> crazy for them
>> because you can name them and that brings clarity. and you get rid of the
>> nesting and can
>> split everything into logical chunks of SQL. KTable variables are the CTE
>> of kafka streams.
>> One can probably sell this to people :)
>>
>> Best Jan
>> Enjoyed your feedback! hope mine makes sense
>>
>>
>>
>>
>>
>> On 03.08.2017 00:10, Guozhang Wang wrote:
>>
>>> Hello Jan,
>>>
>>> Thanks for your proposal. As Bill mentioned the main difference is that we
>>> extract the user-customizable materialization logic out of the topology
>>> building DSL workflow. And the main motivations are in two folds:
>>>
>>> 1) efficiency wise, it allows some KTables to not be materialized if
>>> unnecessary, saving one state store instance and changelog topic.
>>>
>>> 2) programming wise, it looks nicer to separate the topology construction
>>> code from the KTable materialization for IQ uses code.
>>>
>>>
>>> Here are my thoughts regarding these two points:
>>>
>>> Regarding 1), I think with whichever the public APIs (either Damian's
>>> proposal or yours), we can always apply the internal optimization to not
>>> physically materialize the KTable. You can take a look at the internal
>>> interface of "KTableValueGetterSupplier", which is used exactly for this
>>> purposes such that a get call on a "logically" materialized KTable can be
>>> traced back to its parent KTables that are physically materialized in a
>>> state store. So following proposed APIs, for example:
>>>
>>>
>>> stream.groupByKey(..).aggregate(.., Materializedas("store1"))        //
>>> this resulted KTable is materialized in order to complete the aggregation
>>> operation
>>>                                       .filter(Materialized.as("store2"))
>>>                  // this restuled KTable is not materialized but its
>>> GetterSupplier is implemented to get values from "store1"
>>>
>>>
>>> Or
>>>
>>> table1 = stream.groupByKey(..).aggregate(..);
>>> table2 = table1.filter();
>>>
>>> tabel1.queryHandle("store1");       // this resulted KTable is
>>> materialized
>>> in order to complete the aggregation operation
>>> tabel1.queryHandle("store2")        // this restuled KTable is not
>>> materialized but its GetterSupplier is implemented to get values from
>>> "store1"
>>>
>>>
>>>
>>> When user query a value for "store2" which is not actually materialized
>>> into a state store, the GetterSupplier will be triggered to in turn query
>>> the store for "store1", and then apply the filter operator on-the-fly to
>>> return the value. So the bottom line is, we can achieve the same
>>> efficiency
>>> optimization with either of the public APIs.
>>>
>>>
>>> Regarding 2), I actually have proposed a similar API to yours earlier in
>>> this discussion thread:
>>>
>>> ---------------------------------------
>>>
>>> // specifying the topology, should be concise and conveniently
>>> concatenated, no specs of materialization at all
>>>
>>> KStream stream1 = builder.stream();
>>> KTable table1 = stream1.groupby(...).aggregate(initializer, aggregator,
>>> sessionMerger, sessionWindows);  // do not allow to pass-in a state store
>>> supplier here any more
>>>
>>> // additional code to the topology above, could be more prescriptive
>>> than descriptive
>>> // only advanced users would want to code in both parts above; while other
>>> users would only code the topology as above.
>>>
>>> table1.materialize("queryableStoreName"); // or..
>>> table1.materialize("queryableStoreName").enableCaching().enableLogging();
>>> // or..
>>> table1.materialize(stateStoreSupplier); // we check type (key-value
>>> types,
>>> windowed or not etc) at starting time and add the metrics / logging /
>>> caching / windowing wrapper on top of the store, or..
>>> table1.materialize(stateStoreSupplier).enableCaching().enableLogging();
>>> //
>>> etc..
>>>
>>> ---------------------------------------
>>>
>>> But one caveat of that, as illustrated above, is that you need to have
>>> separate object of the KTable in order to call either "queryHandle" or
>>> "materialize" (whatever the function name is) for the specifications of
>>> materialization options. This can break the concatenation of the topology
>>> construction part of the code, that you cannot simply add one operator
>>> directly after another. So I think this is a trade-off we have to make and
>>> the current approach looks better in this regard.
>>>
>>>
>>>
>>> Guozhang
>>>
>>>
>>>
>>>
>>> On Wed, Aug 2, 2017 at 2:07 PM, Jan Filipiak<Ja...@trivago.com>
>>> wrote:
>>>
>>> Hi Bill,
>>>> totally! So in the original discussion it was mentioned that the
>>>> overloads
>>>> are nasty when implementing new features. So we wanted to get rid of
>>>> them.
>>>> But what I felt was that the
>>>> copy & pasted code in the KTableProcessors for maintaining IQ stores was
>>>> as big as a hurdle as the overloads.
>>>>
>>>> With this proposal I try to shift things into the direction of getting IQ
>>>> for free if
>>>> KTableValueGetterSupplier is properly implemented (like getting join for
>>>> free then). Instead of having the code for maintaining IQ stores all the
>>>> places. I realized I can do that while getting rid of the overloads, that
>>>> makes me feel my proposal is superior.
>>>>
>>>> Further I try to optimize by using as few stores as possible to give the
>>>> user what he needs. That should save all sorts of resources while
>>>> allowing
>>>> faster rebalances.
>>>>
>>>> The target ultimately is to only have KTableSource and the Aggregators
>>>> maintain a Store and provide a ValueGetterSupplier.
>>>>
>>>> Does this makes sense to you?
>>>>
>>>> Best Jan
>>>>
>>>> On 02.08.2017 18:09, Bill Bejeck wrote:
>>>>
>>>> Hi Jan,
>>>>> Thanks for the effort in putting your thoughts down on paper.
>>>>>
>>>>> Comparing what I see from your proposal and what is presented in
>>>>> KIP-182,
>>>>> one of the main differences is the exclusion of an`Materialized`
>>>>> instance
>>>>> in the `KTable` methods.
>>>>>
>>>>> Can you go into more detail why this is so and the specific problems is
>>>>> avoids and or solves with this approach?
>>>>>
>>>>> Thanks!
>>>>> Bill
>>>>>
>>>>> On Wed, Aug 2, 2017 at 4:19 AM, Damian Guy <damian.guy@gmail.com
>>>>> <mailto:
>>>>> damian.guy@gmail.com>> wrote:
>>>>>
>>>>>       Hi Jan,
>>>>>
>>>>>       Thanks for taking the time to put this together, appreciated. For
>>>>> the
>>>>>       benefit of others would you mind explaining a bit about your
>>>>>       motivation?
>>>>>
>>>>>       Cheers,
>>>>>       Damian
>>>>>
>>>>>       On Wed, 2 Aug 2017 at 01:40 Jan Filipiak <Jan.Filipiak@trivago.com
>>>>>       <ma...@trivago.com>> wrote:
>>>>>
>>>>>       > Hi all,
>>>>>       >
>>>>>       > after some further discussions, the best thing to show my Idea
>>>>>       of how it
>>>>>       > should evolve would be a bigger mock/interface description.
>>>>>       > The goal is to reduce the store maintaining processors to only
>>>>> the
>>>>>       > Aggregators + and KTableSource. While having KTableSource
>>>>> optionally
>>>>>       > materialized.
>>>>>       >
>>>>>       > Introducing KTable:copy() will allow users to maintain state
>>>>>       twice if
>>>>>       > they really want to. KStream::join*() wasn't touched. I never
>>>>>       personally
>>>>>       > used that so I didn't feel
>>>>>       > comfortable enough touching it. Currently still making up my
>>>>>       mind. None
>>>>>       > of the suggestions made it querieable so far. Gouzhangs
>>>>>       'Buffered' idea
>>>>>       > seems ideal here.
>>>>>       >
>>>>>       > please have a look. Looking forward for your opinions.
>>>>>       >
>>>>>       > Best Jan
>>>>>       >
>>>>>       >
>>>>>       >
>>>>>       > On 21.06.2017 17 <tel:21.06.2017%2017>:24, Eno Thereska wrote:
>>>>>       > > (cc’ing user-list too)
>>>>>       > >
>>>>>       > > Given that we already have StateStoreSuppliers that are
>>>>>       configurable
>>>>>       > using the fluent-like API, probably it’s worth discussing the
>>>>> other
>>>>>       > examples with joins and serdes first since those have many
>>>>>       overloads and
>>>>>       > are in need of some TLC.
>>>>>       > >
>>>>>       > > So following your example, I guess you’d have something like:
>>>>>       > > .join()
>>>>>       > >     .withKeySerdes(…)
>>>>>       > >     .withValueSerdes(…)
>>>>>       > >     .withJoinType(“outer”)
>>>>>       > >
>>>>>       > > etc?
>>>>>       > >
>>>>>       > > I like the approach since it still remains declarative and
>>>>>       it’d reduce
>>>>>       > the number of overloads by quite a bit.
>>>>>       > >
>>>>>       > > Eno
>>>>>       > >
>>>>>       > >> On Jun 21, 2017, at 3:37 PM, Damian Guy <damian.guy@gmail.com
>>>>>       <ma...@gmail.com>> wrote:
>>>>>       > >>
>>>>>       > >> Hi,
>>>>>       > >>
>>>>>       > >> I'd like to get a discussion going around some of the API
>>>>>       choices we've
>>>>>       > >> made in the DLS. In particular those that relate to stateful
>>>>>       operations
>>>>>       > >> (though this could expand).
>>>>>       > >> As it stands we lean heavily on overloaded methods in the
>>>>>       API, i.e,
>>>>>       > there
>>>>>       > >> are 9 overloads for KGroupedStream.count(..)! It is becoming
>>>>>       noisy and i
>>>>>       > >> feel it is only going to get worse as we add more optional
>>>>>       params. In
>>>>>       > >> particular we've had some requests to be able to turn caching
>>>>>       off, or
>>>>>       > >> change log configs,  on a per operator basis (note this can
>>>>>       be done now
>>>>>       > if
>>>>>       > >> you pass in a StateStoreSupplier, but this can be a bit
>>>>>       cumbersome).
>>>>>       > >>
>>>>>       > >> So this is a bit of an open question. How can we change the
>>>>> DSL
>>>>>       > overloads
>>>>>       > >> so that it flows, is simple to use and understand, and is
>>>>> easily
>>>>>       > extended
>>>>>       > >> in the future?
>>>>>       > >>
>>>>>       > >> One option would be to use a fluent API approach for
>>>>>       providing the
>>>>>       > optional
>>>>>       > >> params, so something like this:
>>>>>       > >>
>>>>>       > >> groupedStream.count()
>>>>>       > >>    .withStoreName("name")
>>>>>       > >>    .withCachingEnabled(false)
>>>>>       > >>    .withLoggingEnabled(config)
>>>>>       > >>    .table()
>>>>>       > >>
>>>>>       > >>
>>>>>       > >>
>>>>>       > >> Another option would be to provide a Builder to the count
>>>>>       method, so it
>>>>>       > >> would look something like this:
>>>>>       > >> groupedStream.count(new
>>>>>       > >> CountBuilder("storeName").withCachingEnabled(false).build())
>>>>>       > >>
>>>>>       > >> Another option is to say: Hey we don't need this, what are
>>>>>       you on about!
>>>>>       > >>
>>>>>       > >> The above has focussed on state store related overloads, but
>>>>>       the same
>>>>>       > ideas
>>>>>       > >> could  be applied to joins etc, where we presently have many
>>>>> join
>>>>>       > methods
>>>>>       > >> and many overloads.
>>>>>       > >>
>>>>>       > >> Anyway, i look forward to hearing your opinions.
>>>>>       > >>
>>>>>       > >> Thanks,
>>>>>       > >> Damian
>>>>>       >
>>>>>       >
>>>>>
>>>>>
>>>>>
>>>>>
>


Re: [DISCUSS] Streams DSL/StateStore Refactoring

Posted by Guozhang Wang <wa...@gmail.com>.
Hello Jan,

Thanks for your feedback. Trying to explain them a bit more here since I
think there are still a bit mis-communication here:

Here are a few things I need to clarify for KIP-182 first:

1. KIP-182 is mainly about refactoring the public APIs, NOT for making any
optimizations on the internal implementations. So we only care that these
public APIs changes do not forbid us to make the internal implementations
in the near future.

To give you a concrete example, as you mentioned that KTableValueGetterSupplier
is NOT used in IQ, and that a materialized physical store is always used
today. Yes that is true, and we do have plans to optimize this case soon;
for example, it is still doable with the proposed KIP-182 that we can
remove the physical materialized store but use KTableValueGetterSupplier to
read form a up-stream's physical store and apply the optimizations. Another
example you mentioned is stream-stream join, where each stream is
physically materialized into a store, we can definitely optimize this in
the future to remove the physical materialized store but use something
else, e.g. a in-memory buffer. Such optimizations are NOT blocked by the
updated public APIs of KIP-182.


2. One concern you raise that KIP-182 may actually block such
optimizations, is that if users do specify a StateStoreSupplier then we
cannot optimize that away. That is true, but that is by design: if user do
specify a state store supplier in Materialized API, that is equal to say
"forget about doing any smart things library, just use what I give to you".
In other words, the above mentioned optimizations can be applied if users
do not enforce any specific StateStoreSupplier, for example in

public static <K, V, S extends StateStore> Materialized<K, V, S>
as(final String
storeName)

i.e. user only provide a store name, which is similar like handler token
for IQ; then the library still have the freedom to do smart things
internally which is totally hidden from the users. It is similar to, like
in RDBMS or some NoSQL stores like in HIVE / Cassandra: the store engine do
not have the freedom to do those query plan optimizations if users already
enforce the specs like join ordering, query plan generation rules, etc.


3. About whether it is worthwhile to "not break the fluent interface", a
key point that we cannot just try to optimize one or two use cases, but
consider the whole user space, and ask what are the percentage of users
that may get affected. Note that in the DSL we have overloaded functions
where Materialized / Joined / other options are NOT needed so for most
normal users they do not need to worry about the specs at all.

So suppose there are only X% "advanced" users who would want to enforce
some state store suppliers, and Y% who like to use IQ, 100-X-Y percent of
normal users see no difference in terms of programming for either of these
two approaches: whether to separate the specs into a different set of APIs.
And for the Y percent of users they are most likely to just use the
simplest APIs which is `operator(..., Materialized.as(storeName))` which
does not sound too bad as to `table = operator(...);
table.materialize(storeName)`. In other words we use the first approach
then only X percent of users may have an awkward programming with complex
option specs along with the operator; if we use the second approach the X+Y
users need to break its programing fluency to call `table.materialize`
separately. And my personal guess is that

0 < X << Y < 1, and that X is very minor compared to Y. That is why I feel
this is a good trade-off.


Guozhang


On Fri, Aug 4, 2017 at 6:18 AM, Jan Filipiak <Ja...@trivago.com>
wrote:

> Hi Guozhang,
>
>  thank you very much for the reply. It explained a lot more of your
> reasoning to me
> once again!
>
> I have to disagree with you on the first point. As you mentioned the Join
> Case.
> A Join is usually a "logically" materialized table and its
> KTableValueGetterSupplier
> is to be used when one wants todo a lookup. But this is not at all what is
> currently
> Happening. The join merge processor currently maintains its own new
> statestore
> when join is invoked with Storename or supplier.
>
> This describes the Issue I want to address perfectly. A Joined-Table
> doesn't become
> querieable because it is a JOINEDtable but because it is a joinedTABLE.
> the emphasis here
> is that we put the store logic with the join and not the table. It is part
> of the join() method invocation and not the KTable Interface. This
> abstraction is wrong.
>
> This will always show its ugly face. Lets check your example:
>
> stream.groupByKey(..).aggregate(.., Materializedas("store1"))        //
> this resulted KTable is materialized in order to complete the aggregation
> operation
>   .filter(Materialized.as("store2"))
>                // this restuled KTable is not materialized but its
> GetterSupplier is implemented to get values from "store1"
>
> Currently this is only half true. For IQ a store is used that is maintained
> by the KTableFilterProcessor, for downstream gets like joins the
> ValueGetterSupplier is used
> and indeed uses store1.
>
> With the String overload (that you picked here on purpose I guess) it
> works easier
> as you can logically map those. But with the StateStoreSupplier it
> wouldn't.
> you could not optimize this away as the user is expecting puts and gets to
> be called
> on what he supplied.
>
> table1.filter(() -> true, InMemoryStore).filter(()->true,SQlLiteStore)
>
> There is no way to optimize these away.
> The same argument with the join holds for filter. Its not querrieable
> because it got filtered
> it is querrieable because its a KTable. That's where the emphasis needs to
> go.
>
> The second point was new to me. So I had to think about this in more
> detail.
> For me the breaking of the flow comes in very natural.
>
> One Stream app I put most of my heart in has the these key metrics:
> It has:
> 8   input topics.
> 3   1:n Joins
> 6   Group bys
> 2   built in Joins
> 2   built in left joins
> some filters and mappers.
>
> this is spanning 390 lines, counting java imports and some more stuff.
>
> The whole topology forms a tree in wich the input topics usually get
> joined and then collected to maps
> and then joined again and collected to maps again. until they get send to
> 1 final output topic for consumption in our application servers.
>
> I would argue it is impossible to express this topology as a chain of
> calls. What happened is that
> usually each join + groupBy tuple became its method taking in the builder
> and return the Table
> expressing the result of the sub topology. All Ktables that meet each
> other with the same key in the
> process get joined (most of that happening on the top level). This leads
> to breaking in the fluent interface
> quite naturally. especially if you have 2 KTables expressing
> sub-topologies joined together. One subtopology had to go into the method
> call which is unreasonable IMHO.
>
> Even inside these methods we broke the chains. The variable names we used
> give intermediate KTables really helped in making the semantics clear. They
> are much like CTE's in hive or the required name in Mysql Subquerries. They
> help to mark milestones inside the topology.
>
> I would argue that for big topologies. (I haven't seen others but I think
> its big) these milestones would
> be the most important ones for IQ aswell. So i would argue breaking the
> chains is not really a problem in
> reality and it can help in many cases. As I laid out, we broke our chained
> calls intuitively and it helped
> other developers debugging the logic a lot. Even without detailed streams
> understanding.
>
> If one really do not want to stop the flow. I could argue that one could
> either do something like this
>
> KTable joinresult;
> KTable<Integer,Integer> t1 = b.table("laa");
> KTable<Integer,Integer> t2 = b.table("luu");
> (joinresult = t1.join(t2, (value1, value2) -> value1 + value2))
> .filter((key, value) -> false);
>
> or write a little snitch like that
>
> KTable<Integer,Integer> rememberTableandContinue(KTable<Integer,Integer>
> t){
>                 joinresult = t;
>                 return t;
> }
>
> for usuage as such
>
> rememberTableandContinue(t1.join(t2, (value1, value2) -> value1 + value2))
>                         .filter((key, value) -> false);
>
> These suggestions might not looks so pretty. But in the context of
> breaking bigger topology at milestones.
> I think everything becomes acceptable really. Probably user would store
> that intermediate  KTable anyways just for clarity.
>
> To summarize to give a KTable a name. I would always opt to the host
> language variable names.
> Tables used for IQ are probably tables that are of some sort more
> important to the topology than
> others and saving them separatly will increase the readability of
> topologies by a lot IMO.
>
> For these quick example Topologies that we have floating around in all
> places:
> I am pretty sure one can go unbroken on them and usually the last table
> will be the one that
> is needed for IQ then.
>
>
> Thanks again. The second point really got me thinking, as your perspective
> on the importance
> of "not break the fluent interface" was not clear to me. I hope I managed
> to line out why I
> think it shouldn't have such a big weight in the discussion.
>
> PS.: check out Hive CTE, everyone loves them and our Analytic team is
> crazy for them
> because you can name them and that brings clarity. and you get rid of the
> nesting and can
> split everything into logical chunks of SQL. KTable variables are the CTE
> of kafka streams.
> One can probably sell this to people :)
>
> Best Jan
> Enjoyed your feedback! hope mine makes sense
>
>
>
>
>
> On 03.08.2017 00:10, Guozhang Wang wrote:
>
>> Hello Jan,
>>
>> Thanks for your proposal. As Bill mentioned the main difference is that we
>> extract the user-customizable materialization logic out of the topology
>> building DSL workflow. And the main motivations are in two folds:
>>
>> 1) efficiency wise, it allows some KTables to not be materialized if
>> unnecessary, saving one state store instance and changelog topic.
>>
>> 2) programming wise, it looks nicer to separate the topology construction
>> code from the KTable materialization for IQ uses code.
>>
>>
>> Here are my thoughts regarding these two points:
>>
>> Regarding 1), I think with whichever the public APIs (either Damian's
>> proposal or yours), we can always apply the internal optimization to not
>> physically materialize the KTable. You can take a look at the internal
>> interface of "KTableValueGetterSupplier", which is used exactly for this
>> purposes such that a get call on a "logically" materialized KTable can be
>> traced back to its parent KTables that are physically materialized in a
>> state store. So following proposed APIs, for example:
>>
>>
>> stream.groupByKey(..).aggregate(.., Materializedas("store1"))        //
>> this resulted KTable is materialized in order to complete the aggregation
>> operation
>>                                      .filter(Materialized.as("store2"))
>>                 // this restuled KTable is not materialized but its
>> GetterSupplier is implemented to get values from "store1"
>>
>>
>> Or
>>
>> table1 = stream.groupByKey(..).aggregate(..);
>> table2 = table1.filter();
>>
>> tabel1.queryHandle("store1");       // this resulted KTable is
>> materialized
>> in order to complete the aggregation operation
>> tabel1.queryHandle("store2")        // this restuled KTable is not
>> materialized but its GetterSupplier is implemented to get values from
>> "store1"
>>
>>
>>
>> When user query a value for "store2" which is not actually materialized
>> into a state store, the GetterSupplier will be triggered to in turn query
>> the store for "store1", and then apply the filter operator on-the-fly to
>> return the value. So the bottom line is, we can achieve the same
>> efficiency
>> optimization with either of the public APIs.
>>
>>
>> Regarding 2), I actually have proposed a similar API to yours earlier in
>> this discussion thread:
>>
>> ---------------------------------------
>>
>> // specifying the topology, should be concise and conveniently
>> concatenated, no specs of materialization at all
>>
>> KStream stream1 = builder.stream();
>> KTable table1 = stream1.groupby(...).aggregate(initializer, aggregator,
>> sessionMerger, sessionWindows);  // do not allow to pass-in a state store
>> supplier here any more
>>
>> // additional code to the topology above, could be more prescriptive
>> than descriptive
>> // only advanced users would want to code in both parts above; while other
>> users would only code the topology as above.
>>
>> table1.materialize("queryableStoreName"); // or..
>> table1.materialize("queryableStoreName").enableCaching().enableLogging();
>> // or..
>> table1.materialize(stateStoreSupplier); // we check type (key-value
>> types,
>> windowed or not etc) at starting time and add the metrics / logging /
>> caching / windowing wrapper on top of the store, or..
>> table1.materialize(stateStoreSupplier).enableCaching().enableLogging();
>> //
>> etc..
>>
>> ---------------------------------------
>>
>> But one caveat of that, as illustrated above, is that you need to have
>> separate object of the KTable in order to call either "queryHandle" or
>> "materialize" (whatever the function name is) for the specifications of
>> materialization options. This can break the concatenation of the topology
>> construction part of the code, that you cannot simply add one operator
>> directly after another. So I think this is a trade-off we have to make and
>> the current approach looks better in this regard.
>>
>>
>>
>> Guozhang
>>
>>
>>
>>
>> On Wed, Aug 2, 2017 at 2:07 PM, Jan Filipiak<Ja...@trivago.com>
>> wrote:
>>
>> Hi Bill,
>>>
>>> totally! So in the original discussion it was mentioned that the
>>> overloads
>>> are nasty when implementing new features. So we wanted to get rid of
>>> them.
>>> But what I felt was that the
>>> copy & pasted code in the KTableProcessors for maintaining IQ stores was
>>> as big as a hurdle as the overloads.
>>>
>>> With this proposal I try to shift things into the direction of getting IQ
>>> for free if
>>> KTableValueGetterSupplier is properly implemented (like getting join for
>>> free then). Instead of having the code for maintaining IQ stores all the
>>> places. I realized I can do that while getting rid of the overloads, that
>>> makes me feel my proposal is superior.
>>>
>>> Further I try to optimize by using as few stores as possible to give the
>>> user what he needs. That should save all sorts of resources while
>>> allowing
>>> faster rebalances.
>>>
>>> The target ultimately is to only have KTableSource and the Aggregators
>>> maintain a Store and provide a ValueGetterSupplier.
>>>
>>> Does this makes sense to you?
>>>
>>> Best Jan
>>>
>>> On 02.08.2017 18:09, Bill Bejeck wrote:
>>>
>>> Hi Jan,
>>>>
>>>> Thanks for the effort in putting your thoughts down on paper.
>>>>
>>>> Comparing what I see from your proposal and what is presented in
>>>> KIP-182,
>>>> one of the main differences is the exclusion of an`Materialized`
>>>> instance
>>>> in the `KTable` methods.
>>>>
>>>> Can you go into more detail why this is so and the specific problems is
>>>> avoids and or solves with this approach?
>>>>
>>>> Thanks!
>>>> Bill
>>>>
>>>> On Wed, Aug 2, 2017 at 4:19 AM, Damian Guy <damian.guy@gmail.com
>>>> <mailto:
>>>> damian.guy@gmail.com>> wrote:
>>>>
>>>>      Hi Jan,
>>>>
>>>>      Thanks for taking the time to put this together, appreciated. For
>>>> the
>>>>      benefit of others would you mind explaining a bit about your
>>>>      motivation?
>>>>
>>>>      Cheers,
>>>>      Damian
>>>>
>>>>      On Wed, 2 Aug 2017 at 01:40 Jan Filipiak <Jan.Filipiak@trivago.com
>>>>      <ma...@trivago.com>> wrote:
>>>>
>>>>      > Hi all,
>>>>      >
>>>>      > after some further discussions, the best thing to show my Idea
>>>>      of how it
>>>>      > should evolve would be a bigger mock/interface description.
>>>>      > The goal is to reduce the store maintaining processors to only
>>>> the
>>>>      > Aggregators + and KTableSource. While having KTableSource
>>>> optionally
>>>>      > materialized.
>>>>      >
>>>>      > Introducing KTable:copy() will allow users to maintain state
>>>>      twice if
>>>>      > they really want to. KStream::join*() wasn't touched. I never
>>>>      personally
>>>>      > used that so I didn't feel
>>>>      > comfortable enough touching it. Currently still making up my
>>>>      mind. None
>>>>      > of the suggestions made it querieable so far. Gouzhangs
>>>>      'Buffered' idea
>>>>      > seems ideal here.
>>>>      >
>>>>      > please have a look. Looking forward for your opinions.
>>>>      >
>>>>      > Best Jan
>>>>      >
>>>>      >
>>>>      >
>>>>      > On 21.06.2017 17 <tel:21.06.2017%2017>:24, Eno Thereska wrote:
>>>>      > > (cc’ing user-list too)
>>>>      > >
>>>>      > > Given that we already have StateStoreSuppliers that are
>>>>      configurable
>>>>      > using the fluent-like API, probably it’s worth discussing the
>>>> other
>>>>      > examples with joins and serdes first since those have many
>>>>      overloads and
>>>>      > are in need of some TLC.
>>>>      > >
>>>>      > > So following your example, I guess you’d have something like:
>>>>      > > .join()
>>>>      > >     .withKeySerdes(…)
>>>>      > >     .withValueSerdes(…)
>>>>      > >     .withJoinType(“outer”)
>>>>      > >
>>>>      > > etc?
>>>>      > >
>>>>      > > I like the approach since it still remains declarative and
>>>>      it’d reduce
>>>>      > the number of overloads by quite a bit.
>>>>      > >
>>>>      > > Eno
>>>>      > >
>>>>      > >> On Jun 21, 2017, at 3:37 PM, Damian Guy <damian.guy@gmail.com
>>>>      <ma...@gmail.com>> wrote:
>>>>      > >>
>>>>      > >> Hi,
>>>>      > >>
>>>>      > >> I'd like to get a discussion going around some of the API
>>>>      choices we've
>>>>      > >> made in the DLS. In particular those that relate to stateful
>>>>      operations
>>>>      > >> (though this could expand).
>>>>      > >> As it stands we lean heavily on overloaded methods in the
>>>>      API, i.e,
>>>>      > there
>>>>      > >> are 9 overloads for KGroupedStream.count(..)! It is becoming
>>>>      noisy and i
>>>>      > >> feel it is only going to get worse as we add more optional
>>>>      params. In
>>>>      > >> particular we've had some requests to be able to turn caching
>>>>      off, or
>>>>      > >> change log configs,  on a per operator basis (note this can
>>>>      be done now
>>>>      > if
>>>>      > >> you pass in a StateStoreSupplier, but this can be a bit
>>>>      cumbersome).
>>>>      > >>
>>>>      > >> So this is a bit of an open question. How can we change the
>>>> DSL
>>>>      > overloads
>>>>      > >> so that it flows, is simple to use and understand, and is
>>>> easily
>>>>      > extended
>>>>      > >> in the future?
>>>>      > >>
>>>>      > >> One option would be to use a fluent API approach for
>>>>      providing the
>>>>      > optional
>>>>      > >> params, so something like this:
>>>>      > >>
>>>>      > >> groupedStream.count()
>>>>      > >>    .withStoreName("name")
>>>>      > >>    .withCachingEnabled(false)
>>>>      > >>    .withLoggingEnabled(config)
>>>>      > >>    .table()
>>>>      > >>
>>>>      > >>
>>>>      > >>
>>>>      > >> Another option would be to provide a Builder to the count
>>>>      method, so it
>>>>      > >> would look something like this:
>>>>      > >> groupedStream.count(new
>>>>      > >> CountBuilder("storeName").withCachingEnabled(false).build())
>>>>      > >>
>>>>      > >> Another option is to say: Hey we don't need this, what are
>>>>      you on about!
>>>>      > >>
>>>>      > >> The above has focussed on state store related overloads, but
>>>>      the same
>>>>      > ideas
>>>>      > >> could  be applied to joins etc, where we presently have many
>>>> join
>>>>      > methods
>>>>      > >> and many overloads.
>>>>      > >>
>>>>      > >> Anyway, i look forward to hearing your opinions.
>>>>      > >>
>>>>      > >> Thanks,
>>>>      > >> Damian
>>>>      >
>>>>      >
>>>>
>>>>
>>>>
>>>>
>


-- 
-- Guozhang

Re: [DISCUSS] Streams DSL/StateStore Refactoring

Posted by Jan Filipiak <Ja...@trivago.com>.
Hi Guozhang,

  thank you very much for the reply. It explained a lot more of your 
reasoning to me
once again!

I have to disagree with you on the first point. As you mentioned the 
Join Case.
A Join is usually a "logically" materialized table and its 
KTableValueGetterSupplier
is to be used when one wants todo a lookup. But this is not at all what 
is currently
Happening. The join merge processor currently maintains its own new 
statestore
when join is invoked with Storename or supplier.

This describes the Issue I want to address perfectly. A Joined-Table 
doesn't become
querieable because it is a JOINEDtable but because it is a joinedTABLE.  
the emphasis here
is that we put the store logic with the join and not the table. It is 
part of the join() method invocation and not the KTable Interface. This 
abstraction is wrong.

This will always show its ugly face. Lets check your example:

stream.groupByKey(..).aggregate(.., Materializedas("store1"))        //
this resulted KTable is materialized in order to complete the aggregation
operation
   .filter(Materialized.as("store2"))
                // this restuled KTable is not materialized but its
GetterSupplier is implemented to get values from "store1"

Currently this is only half true. For IQ a store is used that is maintained
by the KTableFilterProcessor, for downstream gets like joins the ValueGetterSupplier is used
and indeed uses store1.

With the String overload (that you picked here on purpose I guess) it works easier
as you can logically map those. But with the StateStoreSupplier it wouldn't.
you could not optimize this away as the user is expecting puts and gets to be called
on what he supplied.

table1.filter(() -> true, InMemoryStore).filter(()->true,SQlLiteStore)

There is no way to optimize these away.
The same argument with the join holds for filter. Its not querrieable because it got filtered
it is querrieable because its a KTable. That's where the emphasis needs to go.

The second point was new to me. So I had to think about this in more detail.
For me the breaking of the flow comes in very natural.

One Stream app I put most of my heart in has the these key metrics:
It has:
8   input topics.
3   1:n Joins
6   Group bys
2   built in Joins
2   built in left joins
some filters and mappers.

this is spanning 390 lines, counting java imports and some more stuff.

The whole topology forms a tree in wich the input topics usually get joined and then collected to maps
and then joined again and collected to maps again. until they get send to 1 final output topic for consumption in our application servers.

I would argue it is impossible to express this topology as a chain of calls. What happened is that
usually each join + groupBy tuple became its method taking in the builder and return the Table
expressing the result of the sub topology. All Ktables that meet each other with the same key in the
process get joined (most of that happening on the top level). This leads to breaking in the fluent interface
quite naturally. especially if you have 2 KTables expressing sub-topologies joined together. One subtopology had to go into the method call which is unreasonable IMHO.

Even inside these methods we broke the chains. The variable names we used give intermediate KTables really helped in making the semantics clear. They are much like CTE's in hive or the required name in Mysql Subquerries. They help to mark milestones inside the topology.

I would argue that for big topologies. (I haven't seen others but I think its big) these milestones would
be the most important ones for IQ aswell. So i would argue breaking the chains is not really a problem in
reality and it can help in many cases. As I laid out, we broke our chained calls intuitively and it helped
other developers debugging the logic a lot. Even without detailed streams understanding.

If one really do not want to stop the flow. I could argue that one could either do something like this

KTable joinresult;
KTable<Integer,Integer> t1 = b.table("laa");
KTable<Integer,Integer> t2 = b.table("luu");
(joinresult = t1.join(t2, (value1, value2) -> value1 + value2))
.filter((key, value) -> false);

or write a little snitch like that

KTable<Integer,Integer> rememberTableandContinue(KTable<Integer,Integer> t){
		joinresult = t;
		return t;
}

for usuage as such

rememberTableandContinue(t1.join(t2, (value1, value2) -> value1 + value2))
			.filter((key, value) -> false);

These suggestions might not looks so pretty. But in the context of breaking bigger topology at milestones.
I think everything becomes acceptable really. Probably user would store that intermediate  KTable anyways just for clarity.

To summarize to give a KTable a name. I would always opt to the host language variable names.
Tables used for IQ are probably tables that are of some sort more important to the topology than
others and saving them separatly will increase the readability of topologies by a lot IMO.

For these quick example Topologies that we have floating around in all places:
I am pretty sure one can go unbroken on them and usually the last table will be the one that
is needed for IQ then.


Thanks again. The second point really got me thinking, as your perspective on the importance
of "not break the fluent interface" was not clear to me. I hope I managed to line out why I
think it shouldn't have such a big weight in the discussion.

PS.: check out Hive CTE, everyone loves them and our Analytic team is crazy for them
because you can name them and that brings clarity. and you get rid of the nesting and can
split everything into logical chunks of SQL. KTable variables are the CTE of kafka streams.
One can probably sell this to people :)

Best Jan
Enjoyed your feedback! hope mine makes sense




On 03.08.2017 00:10, Guozhang Wang wrote:
> Hello Jan,
>
> Thanks for your proposal. As Bill mentioned the main difference is that we
> extract the user-customizable materialization logic out of the topology
> building DSL workflow. And the main motivations are in two folds:
>
> 1) efficiency wise, it allows some KTables to not be materialized if
> unnecessary, saving one state store instance and changelog topic.
>
> 2) programming wise, it looks nicer to separate the topology construction
> code from the KTable materialization for IQ uses code.
>
>
> Here are my thoughts regarding these two points:
>
> Regarding 1), I think with whichever the public APIs (either Damian's
> proposal or yours), we can always apply the internal optimization to not
> physically materialize the KTable. You can take a look at the internal
> interface of "KTableValueGetterSupplier", which is used exactly for this
> purposes such that a get call on a "logically" materialized KTable can be
> traced back to its parent KTables that are physically materialized in a
> state store. So following proposed APIs, for example:
>
>
> stream.groupByKey(..).aggregate(.., Materializedas("store1"))        //
> this resulted KTable is materialized in order to complete the aggregation
> operation
>                                      .filter(Materialized.as("store2"))
>                 // this restuled KTable is not materialized but its
> GetterSupplier is implemented to get values from "store1"
>
>
> Or
>
> table1 = stream.groupByKey(..).aggregate(..);
> table2 = table1.filter();
>
> tabel1.queryHandle("store1");       // this resulted KTable is materialized
> in order to complete the aggregation operation
> tabel1.queryHandle("store2")        // this restuled KTable is not
> materialized but its GetterSupplier is implemented to get values from
> "store1"
>
>
>
> When user query a value for "store2" which is not actually materialized
> into a state store, the GetterSupplier will be triggered to in turn query
> the store for "store1", and then apply the filter operator on-the-fly to
> return the value. So the bottom line is, we can achieve the same efficiency
> optimization with either of the public APIs.
>
>
> Regarding 2), I actually have proposed a similar API to yours earlier in
> this discussion thread:
>
> ---------------------------------------
>
> // specifying the topology, should be concise and conveniently
> concatenated, no specs of materialization at all
>
> KStream stream1 = builder.stream();
> KTable table1 = stream1.groupby(...).aggregate(initializer, aggregator,
> sessionMerger, sessionWindows);  // do not allow to pass-in a state store
> supplier here any more
>
> // additional code to the topology above, could be more prescriptive
> than descriptive
> // only advanced users would want to code in both parts above; while other
> users would only code the topology as above.
>
> table1.materialize("queryableStoreName"); // or..
> table1.materialize("queryableStoreName").enableCaching().enableLogging();
> // or..
> table1.materialize(stateStoreSupplier); // we check type (key-value types,
> windowed or not etc) at starting time and add the metrics / logging /
> caching / windowing wrapper on top of the store, or..
> table1.materialize(stateStoreSupplier).enableCaching().enableLogging(); //
> etc..
>
> ---------------------------------------
>
> But one caveat of that, as illustrated above, is that you need to have
> separate object of the KTable in order to call either "queryHandle" or
> "materialize" (whatever the function name is) for the specifications of
> materialization options. This can break the concatenation of the topology
> construction part of the code, that you cannot simply add one operator
> directly after another. So I think this is a trade-off we have to make and
> the current approach looks better in this regard.
>
>
>
> Guozhang
>
>
>
>
> On Wed, Aug 2, 2017 at 2:07 PM, Jan Filipiak<Ja...@trivago.com>
> wrote:
>
>> Hi Bill,
>>
>> totally! So in the original discussion it was mentioned that the overloads
>> are nasty when implementing new features. So we wanted to get rid of them.
>> But what I felt was that the
>> copy & pasted code in the KTableProcessors for maintaining IQ stores was
>> as big as a hurdle as the overloads.
>>
>> With this proposal I try to shift things into the direction of getting IQ
>> for free if
>> KTableValueGetterSupplier is properly implemented (like getting join for
>> free then). Instead of having the code for maintaining IQ stores all the
>> places. I realized I can do that while getting rid of the overloads, that
>> makes me feel my proposal is superior.
>>
>> Further I try to optimize by using as few stores as possible to give the
>> user what he needs. That should save all sorts of resources while allowing
>> faster rebalances.
>>
>> The target ultimately is to only have KTableSource and the Aggregators
>> maintain a Store and provide a ValueGetterSupplier.
>>
>> Does this makes sense to you?
>>
>> Best Jan
>>
>> On 02.08.2017 18:09, Bill Bejeck wrote:
>>
>>> Hi Jan,
>>>
>>> Thanks for the effort in putting your thoughts down on paper.
>>>
>>> Comparing what I see from your proposal and what is presented in KIP-182,
>>> one of the main differences is the exclusion of an`Materialized`  instance
>>> in the `KTable` methods.
>>>
>>> Can you go into more detail why this is so and the specific problems is
>>> avoids and or solves with this approach?
>>>
>>> Thanks!
>>> Bill
>>>
>>> On Wed, Aug 2, 2017 at 4:19 AM, Damian Guy <damian.guy@gmail.com  <mailto:
>>> damian.guy@gmail.com>> wrote:
>>>
>>>      Hi Jan,
>>>
>>>      Thanks for taking the time to put this together, appreciated. For the
>>>      benefit of others would you mind explaining a bit about your
>>>      motivation?
>>>
>>>      Cheers,
>>>      Damian
>>>
>>>      On Wed, 2 Aug 2017 at 01:40 Jan Filipiak <Jan.Filipiak@trivago.com
>>>      <ma...@trivago.com>> wrote:
>>>
>>>      > Hi all,
>>>      >
>>>      > after some further discussions, the best thing to show my Idea
>>>      of how it
>>>      > should evolve would be a bigger mock/interface description.
>>>      > The goal is to reduce the store maintaining processors to only the
>>>      > Aggregators + and KTableSource. While having KTableSource optionally
>>>      > materialized.
>>>      >
>>>      > Introducing KTable:copy() will allow users to maintain state
>>>      twice if
>>>      > they really want to. KStream::join*() wasn't touched. I never
>>>      personally
>>>      > used that so I didn't feel
>>>      > comfortable enough touching it. Currently still making up my
>>>      mind. None
>>>      > of the suggestions made it querieable so far. Gouzhangs
>>>      'Buffered' idea
>>>      > seems ideal here.
>>>      >
>>>      > please have a look. Looking forward for your opinions.
>>>      >
>>>      > Best Jan
>>>      >
>>>      >
>>>      >
>>>      > On 21.06.2017 17 <tel:21.06.2017%2017>:24, Eno Thereska wrote:
>>>      > > (cc’ing user-list too)
>>>      > >
>>>      > > Given that we already have StateStoreSuppliers that are
>>>      configurable
>>>      > using the fluent-like API, probably it’s worth discussing the other
>>>      > examples with joins and serdes first since those have many
>>>      overloads and
>>>      > are in need of some TLC.
>>>      > >
>>>      > > So following your example, I guess you’d have something like:
>>>      > > .join()
>>>      > >     .withKeySerdes(…)
>>>      > >     .withValueSerdes(…)
>>>      > >     .withJoinType(“outer”)
>>>      > >
>>>      > > etc?
>>>      > >
>>>      > > I like the approach since it still remains declarative and
>>>      it’d reduce
>>>      > the number of overloads by quite a bit.
>>>      > >
>>>      > > Eno
>>>      > >
>>>      > >> On Jun 21, 2017, at 3:37 PM, Damian Guy <damian.guy@gmail.com
>>>      <ma...@gmail.com>> wrote:
>>>      > >>
>>>      > >> Hi,
>>>      > >>
>>>      > >> I'd like to get a discussion going around some of the API
>>>      choices we've
>>>      > >> made in the DLS. In particular those that relate to stateful
>>>      operations
>>>      > >> (though this could expand).
>>>      > >> As it stands we lean heavily on overloaded methods in the
>>>      API, i.e,
>>>      > there
>>>      > >> are 9 overloads for KGroupedStream.count(..)! It is becoming
>>>      noisy and i
>>>      > >> feel it is only going to get worse as we add more optional
>>>      params. In
>>>      > >> particular we've had some requests to be able to turn caching
>>>      off, or
>>>      > >> change log configs,  on a per operator basis (note this can
>>>      be done now
>>>      > if
>>>      > >> you pass in a StateStoreSupplier, but this can be a bit
>>>      cumbersome).
>>>      > >>
>>>      > >> So this is a bit of an open question. How can we change the DSL
>>>      > overloads
>>>      > >> so that it flows, is simple to use and understand, and is easily
>>>      > extended
>>>      > >> in the future?
>>>      > >>
>>>      > >> One option would be to use a fluent API approach for
>>>      providing the
>>>      > optional
>>>      > >> params, so something like this:
>>>      > >>
>>>      > >> groupedStream.count()
>>>      > >>    .withStoreName("name")
>>>      > >>    .withCachingEnabled(false)
>>>      > >>    .withLoggingEnabled(config)
>>>      > >>    .table()
>>>      > >>
>>>      > >>
>>>      > >>
>>>      > >> Another option would be to provide a Builder to the count
>>>      method, so it
>>>      > >> would look something like this:
>>>      > >> groupedStream.count(new
>>>      > >> CountBuilder("storeName").withCachingEnabled(false).build())
>>>      > >>
>>>      > >> Another option is to say: Hey we don't need this, what are
>>>      you on about!
>>>      > >>
>>>      > >> The above has focussed on state store related overloads, but
>>>      the same
>>>      > ideas
>>>      > >> could  be applied to joins etc, where we presently have many join
>>>      > methods
>>>      > >> and many overloads.
>>>      > >>
>>>      > >> Anyway, i look forward to hearing your opinions.
>>>      > >>
>>>      > >> Thanks,
>>>      > >> Damian
>>>      >
>>>      >
>>>
>>>
>>>


Re: [DISCUSS] Streams DSL/StateStore Refactoring

Posted by Guozhang Wang <wa...@gmail.com>.
Hello Jan,

Thanks for your proposal. As Bill mentioned the main difference is that we
extract the user-customizable materialization logic out of the topology
building DSL workflow. And the main motivations are in two folds:

1) efficiency wise, it allows some KTables to not be materialized if
unnecessary, saving one state store instance and changelog topic.

2) programming wise, it looks nicer to separate the topology construction
code from the KTable materialization for IQ uses code.


Here are my thoughts regarding these two points:

Regarding 1), I think with whichever the public APIs (either Damian's
proposal or yours), we can always apply the internal optimization to not
physically materialize the KTable. You can take a look at the internal
interface of "KTableValueGetterSupplier", which is used exactly for this
purposes such that a get call on a "logically" materialized KTable can be
traced back to its parent KTables that are physically materialized in a
state store. So following proposed APIs, for example:


stream.groupByKey(..).aggregate(.., Materializedas("store1"))        //
this resulted KTable is materialized in order to complete the aggregation
operation
                                    .filter(Materialized.as("store2"))
               // this restuled KTable is not materialized but its
GetterSupplier is implemented to get values from "store1"


Or

table1 = stream.groupByKey(..).aggregate(..);
table2 = table1.filter();

tabel1.queryHandle("store1");       // this resulted KTable is materialized
in order to complete the aggregation operation
tabel1.queryHandle("store2")        // this restuled KTable is not
materialized but its GetterSupplier is implemented to get values from
"store1"



When user query a value for "store2" which is not actually materialized
into a state store, the GetterSupplier will be triggered to in turn query
the store for "store1", and then apply the filter operator on-the-fly to
return the value. So the bottom line is, we can achieve the same efficiency
optimization with either of the public APIs.


Regarding 2), I actually have proposed a similar API to yours earlier in
this discussion thread:

---------------------------------------

// specifying the topology, should be concise and conveniently
concatenated, no specs of materialization at all

KStream stream1 = builder.stream();
KTable table1 = stream1.groupby(...).aggregate(initializer, aggregator,
sessionMerger, sessionWindows);  // do not allow to pass-in a state store
supplier here any more

// additional code to the topology above, could be more prescriptive
than descriptive
// only advanced users would want to code in both parts above; while other
users would only code the topology as above.

table1.materialize("queryableStoreName"); // or..
table1.materialize("queryableStoreName").enableCaching().enableLogging();
// or..
table1.materialize(stateStoreSupplier); // we check type (key-value types,
windowed or not etc) at starting time and add the metrics / logging /
caching / windowing wrapper on top of the store, or..
table1.materialize(stateStoreSupplier).enableCaching().enableLogging(); //
etc..

---------------------------------------

But one caveat of that, as illustrated above, is that you need to have
separate object of the KTable in order to call either "queryHandle" or
"materialize" (whatever the function name is) for the specifications of
materialization options. This can break the concatenation of the topology
construction part of the code, that you cannot simply add one operator
directly after another. So I think this is a trade-off we have to make and
the current approach looks better in this regard.



Guozhang




On Wed, Aug 2, 2017 at 2:07 PM, Jan Filipiak <Ja...@trivago.com>
wrote:

> Hi Bill,
>
> totally! So in the original discussion it was mentioned that the overloads
> are nasty when implementing new features. So we wanted to get rid of them.
> But what I felt was that the
> copy & pasted code in the KTableProcessors for maintaining IQ stores was
> as big as a hurdle as the overloads.
>
> With this proposal I try to shift things into the direction of getting IQ
> for free if
> KTableValueGetterSupplier is properly implemented (like getting join for
> free then). Instead of having the code for maintaining IQ stores all the
> places. I realized I can do that while getting rid of the overloads, that
> makes me feel my proposal is superior.
>
> Further I try to optimize by using as few stores as possible to give the
> user what he needs. That should save all sorts of resources while allowing
> faster rebalances.
>
> The target ultimately is to only have KTableSource and the Aggregators
> maintain a Store and provide a ValueGetterSupplier.
>
> Does this makes sense to you?
>
> Best Jan
>
> On 02.08.2017 18:09, Bill Bejeck wrote:
>
>> Hi Jan,
>>
>> Thanks for the effort in putting your thoughts down on paper.
>>
>> Comparing what I see from your proposal and what is presented in KIP-182,
>> one of the main differences is the exclusion of an`Materialized`  instance
>> in the `KTable` methods.
>>
>> Can you go into more detail why this is so and the specific problems is
>> avoids and or solves with this approach?
>>
>> Thanks!
>> Bill
>>
>> On Wed, Aug 2, 2017 at 4:19 AM, Damian Guy <damian.guy@gmail.com <mailto:
>> damian.guy@gmail.com>> wrote:
>>
>>     Hi Jan,
>>
>>     Thanks for taking the time to put this together, appreciated. For the
>>     benefit of others would you mind explaining a bit about your
>>     motivation?
>>
>>     Cheers,
>>     Damian
>>
>>     On Wed, 2 Aug 2017 at 01:40 Jan Filipiak <Jan.Filipiak@trivago.com
>>     <ma...@trivago.com>> wrote:
>>
>>     > Hi all,
>>     >
>>     > after some further discussions, the best thing to show my Idea
>>     of how it
>>     > should evolve would be a bigger mock/interface description.
>>     > The goal is to reduce the store maintaining processors to only the
>>     > Aggregators + and KTableSource. While having KTableSource optionally
>>     > materialized.
>>     >
>>     > Introducing KTable:copy() will allow users to maintain state
>>     twice if
>>     > they really want to. KStream::join*() wasn't touched. I never
>>     personally
>>     > used that so I didn't feel
>>     > comfortable enough touching it. Currently still making up my
>>     mind. None
>>     > of the suggestions made it querieable so far. Gouzhangs
>>     'Buffered' idea
>>     > seems ideal here.
>>     >
>>     > please have a look. Looking forward for your opinions.
>>     >
>>     > Best Jan
>>     >
>>     >
>>     >
>>     > On 21.06.2017 17 <tel:21.06.2017%2017>:24, Eno Thereska wrote:
>>     > > (cc’ing user-list too)
>>     > >
>>     > > Given that we already have StateStoreSuppliers that are
>>     configurable
>>     > using the fluent-like API, probably it’s worth discussing the other
>>     > examples with joins and serdes first since those have many
>>     overloads and
>>     > are in need of some TLC.
>>     > >
>>     > > So following your example, I guess you’d have something like:
>>     > > .join()
>>     > >     .withKeySerdes(…)
>>     > >     .withValueSerdes(…)
>>     > >     .withJoinType(“outer”)
>>     > >
>>     > > etc?
>>     > >
>>     > > I like the approach since it still remains declarative and
>>     it’d reduce
>>     > the number of overloads by quite a bit.
>>     > >
>>     > > Eno
>>     > >
>>     > >> On Jun 21, 2017, at 3:37 PM, Damian Guy <damian.guy@gmail.com
>>     <ma...@gmail.com>> wrote:
>>     > >>
>>     > >> Hi,
>>     > >>
>>     > >> I'd like to get a discussion going around some of the API
>>     choices we've
>>     > >> made in the DLS. In particular those that relate to stateful
>>     operations
>>     > >> (though this could expand).
>>     > >> As it stands we lean heavily on overloaded methods in the
>>     API, i.e,
>>     > there
>>     > >> are 9 overloads for KGroupedStream.count(..)! It is becoming
>>     noisy and i
>>     > >> feel it is only going to get worse as we add more optional
>>     params. In
>>     > >> particular we've had some requests to be able to turn caching
>>     off, or
>>     > >> change log configs,  on a per operator basis (note this can
>>     be done now
>>     > if
>>     > >> you pass in a StateStoreSupplier, but this can be a bit
>>     cumbersome).
>>     > >>
>>     > >> So this is a bit of an open question. How can we change the DSL
>>     > overloads
>>     > >> so that it flows, is simple to use and understand, and is easily
>>     > extended
>>     > >> in the future?
>>     > >>
>>     > >> One option would be to use a fluent API approach for
>>     providing the
>>     > optional
>>     > >> params, so something like this:
>>     > >>
>>     > >> groupedStream.count()
>>     > >>    .withStoreName("name")
>>     > >>    .withCachingEnabled(false)
>>     > >>    .withLoggingEnabled(config)
>>     > >>    .table()
>>     > >>
>>     > >>
>>     > >>
>>     > >> Another option would be to provide a Builder to the count
>>     method, so it
>>     > >> would look something like this:
>>     > >> groupedStream.count(new
>>     > >> CountBuilder("storeName").withCachingEnabled(false).build())
>>     > >>
>>     > >> Another option is to say: Hey we don't need this, what are
>>     you on about!
>>     > >>
>>     > >> The above has focussed on state store related overloads, but
>>     the same
>>     > ideas
>>     > >> could  be applied to joins etc, where we presently have many join
>>     > methods
>>     > >> and many overloads.
>>     > >>
>>     > >> Anyway, i look forward to hearing your opinions.
>>     > >>
>>     > >> Thanks,
>>     > >> Damian
>>     >
>>     >
>>
>>
>>
>


-- 
-- Guozhang

Re: [DISCUSS] Streams DSL/StateStore Refactoring

Posted by Jan Filipiak <Ja...@trivago.com>.
Hi Bill,

totally! So in the original discussion it was mentioned that the 
overloads are nasty when implementing new features. So we wanted to get 
rid of them. But what I felt was that the
copy & pasted code in the KTableProcessors for maintaining IQ stores was 
as big as a hurdle as the overloads.

With this proposal I try to shift things into the direction of getting 
IQ for free if
KTableValueGetterSupplier is properly implemented (like getting join for 
free then). Instead of having the code for maintaining IQ stores all the 
places. I realized I can do that while getting rid of the overloads, 
that makes me feel my proposal is superior.

Further I try to optimize by using as few stores as possible to give the 
user what he needs. That should save all sorts of resources while 
allowing faster rebalances.

The target ultimately is to only have KTableSource and the Aggregators 
maintain a Store and provide a ValueGetterSupplier.

Does this makes sense to you?

Best Jan

On 02.08.2017 18:09, Bill Bejeck wrote:
> Hi Jan,
>
> Thanks for the effort in putting your thoughts down on paper.
>
> Comparing what I see from your proposal and what is presented in 
> KIP-182, one of the main differences is the exclusion of 
> an`Materialized`  instance in the `KTable` methods.
>
> Can you go into more detail why this is so and the specific problems 
> is avoids and or solves with this approach?
>
> Thanks!
> Bill
>
> On Wed, Aug 2, 2017 at 4:19 AM, Damian Guy <damian.guy@gmail.com 
> <ma...@gmail.com>> wrote:
>
>     Hi Jan,
>
>     Thanks for taking the time to put this together, appreciated. For the
>     benefit of others would you mind explaining a bit about your
>     motivation?
>
>     Cheers,
>     Damian
>
>     On Wed, 2 Aug 2017 at 01:40 Jan Filipiak <Jan.Filipiak@trivago.com
>     <ma...@trivago.com>> wrote:
>
>     > Hi all,
>     >
>     > after some further discussions, the best thing to show my Idea
>     of how it
>     > should evolve would be a bigger mock/interface description.
>     > The goal is to reduce the store maintaining processors to only the
>     > Aggregators + and KTableSource. While having KTableSource optionally
>     > materialized.
>     >
>     > Introducing KTable:copy() will allow users to maintain state
>     twice if
>     > they really want to. KStream::join*() wasn't touched. I never
>     personally
>     > used that so I didn't feel
>     > comfortable enough touching it. Currently still making up my
>     mind. None
>     > of the suggestions made it querieable so far. Gouzhangs
>     'Buffered' idea
>     > seems ideal here.
>     >
>     > please have a look. Looking forward for your opinions.
>     >
>     > Best Jan
>     >
>     >
>     >
>     > On 21.06.2017 17 <tel:21.06.2017%2017>:24, Eno Thereska wrote:
>     > > (cc’ing user-list too)
>     > >
>     > > Given that we already have StateStoreSuppliers that are
>     configurable
>     > using the fluent-like API, probably it’s worth discussing the other
>     > examples with joins and serdes first since those have many
>     overloads and
>     > are in need of some TLC.
>     > >
>     > > So following your example, I guess you’d have something like:
>     > > .join()
>     > >     .withKeySerdes(…)
>     > >     .withValueSerdes(…)
>     > >     .withJoinType(“outer”)
>     > >
>     > > etc?
>     > >
>     > > I like the approach since it still remains declarative and
>     it’d reduce
>     > the number of overloads by quite a bit.
>     > >
>     > > Eno
>     > >
>     > >> On Jun 21, 2017, at 3:37 PM, Damian Guy <damian.guy@gmail.com
>     <ma...@gmail.com>> wrote:
>     > >>
>     > >> Hi,
>     > >>
>     > >> I'd like to get a discussion going around some of the API
>     choices we've
>     > >> made in the DLS. In particular those that relate to stateful
>     operations
>     > >> (though this could expand).
>     > >> As it stands we lean heavily on overloaded methods in the
>     API, i.e,
>     > there
>     > >> are 9 overloads for KGroupedStream.count(..)! It is becoming
>     noisy and i
>     > >> feel it is only going to get worse as we add more optional
>     params. In
>     > >> particular we've had some requests to be able to turn caching
>     off, or
>     > >> change log configs,  on a per operator basis (note this can
>     be done now
>     > if
>     > >> you pass in a StateStoreSupplier, but this can be a bit
>     cumbersome).
>     > >>
>     > >> So this is a bit of an open question. How can we change the DSL
>     > overloads
>     > >> so that it flows, is simple to use and understand, and is easily
>     > extended
>     > >> in the future?
>     > >>
>     > >> One option would be to use a fluent API approach for
>     providing the
>     > optional
>     > >> params, so something like this:
>     > >>
>     > >> groupedStream.count()
>     > >>    .withStoreName("name")
>     > >>    .withCachingEnabled(false)
>     > >>    .withLoggingEnabled(config)
>     > >>    .table()
>     > >>
>     > >>
>     > >>
>     > >> Another option would be to provide a Builder to the count
>     method, so it
>     > >> would look something like this:
>     > >> groupedStream.count(new
>     > >> CountBuilder("storeName").withCachingEnabled(false).build())
>     > >>
>     > >> Another option is to say: Hey we don't need this, what are
>     you on about!
>     > >>
>     > >> The above has focussed on state store related overloads, but
>     the same
>     > ideas
>     > >> could  be applied to joins etc, where we presently have many join
>     > methods
>     > >> and many overloads.
>     > >>
>     > >> Anyway, i look forward to hearing your opinions.
>     > >>
>     > >> Thanks,
>     > >> Damian
>     >
>     >
>
>


Re: [DISCUSS] Streams DSL/StateStore Refactoring

Posted by Bill Bejeck <bi...@confluent.io>.
Hi Jan,

Thanks for the effort in putting your thoughts down on paper.

Comparing what I see from your proposal and what is presented in KIP-182,
one of the main differences is the exclusion of an`Materialized`  instance
in the `KTable` methods.

Can you go into more detail why this is so and the specific problems is
avoids and or solves with this approach?

Thanks!
Bill

On Wed, Aug 2, 2017 at 4:19 AM, Damian Guy <da...@gmail.com> wrote:

> Hi Jan,
>
> Thanks for taking the time to put this together, appreciated. For the
> benefit of others would you mind explaining a bit about your motivation?
>
> Cheers,
> Damian
>
> On Wed, 2 Aug 2017 at 01:40 Jan Filipiak <Ja...@trivago.com> wrote:
>
> > Hi all,
> >
> > after some further discussions, the best thing to show my Idea of how it
> > should evolve would be a bigger mock/interface description.
> > The goal is to reduce the store maintaining processors to only the
> > Aggregators + and KTableSource. While having KTableSource optionally
> > materialized.
> >
> > Introducing KTable:copy() will allow users to maintain state twice if
> > they really want to. KStream::join*() wasn't touched. I never personally
> > used that so I didn't feel
> > comfortable enough touching it. Currently still making up my mind. None
> > of the suggestions made it querieable so far. Gouzhangs 'Buffered' idea
> > seems ideal here.
> >
> > please have a look. Looking forward for your opinions.
> >
> > Best Jan
> >
> >
> >
> > On 21.06.2017 17:24, Eno Thereska wrote:
> > > (cc’ing user-list too)
> > >
> > > Given that we already have StateStoreSuppliers that are configurable
> > using the fluent-like API, probably it’s worth discussing the other
> > examples with joins and serdes first since those have many overloads and
> > are in need of some TLC.
> > >
> > > So following your example, I guess you’d have something like:
> > > .join()
> > >     .withKeySerdes(…)
> > >     .withValueSerdes(…)
> > >     .withJoinType(“outer”)
> > >
> > > etc?
> > >
> > > I like the approach since it still remains declarative and it’d reduce
> > the number of overloads by quite a bit.
> > >
> > > Eno
> > >
> > >> On Jun 21, 2017, at 3:37 PM, Damian Guy <da...@gmail.com> wrote:
> > >>
> > >> Hi,
> > >>
> > >> I'd like to get a discussion going around some of the API choices
> we've
> > >> made in the DLS. In particular those that relate to stateful
> operations
> > >> (though this could expand).
> > >> As it stands we lean heavily on overloaded methods in the API, i.e,
> > there
> > >> are 9 overloads for KGroupedStream.count(..)! It is becoming noisy
> and i
> > >> feel it is only going to get worse as we add more optional params. In
> > >> particular we've had some requests to be able to turn caching off, or
> > >> change log configs,  on a per operator basis (note this can be done
> now
> > if
> > >> you pass in a StateStoreSupplier, but this can be a bit cumbersome).
> > >>
> > >> So this is a bit of an open question. How can we change the DSL
> > overloads
> > >> so that it flows, is simple to use and understand, and is easily
> > extended
> > >> in the future?
> > >>
> > >> One option would be to use a fluent API approach for providing the
> > optional
> > >> params, so something like this:
> > >>
> > >> groupedStream.count()
> > >>    .withStoreName("name")
> > >>    .withCachingEnabled(false)
> > >>    .withLoggingEnabled(config)
> > >>    .table()
> > >>
> > >>
> > >>
> > >> Another option would be to provide a Builder to the count method, so
> it
> > >> would look something like this:
> > >> groupedStream.count(new
> > >> CountBuilder("storeName").withCachingEnabled(false).build())
> > >>
> > >> Another option is to say: Hey we don't need this, what are you on
> about!
> > >>
> > >> The above has focussed on state store related overloads, but the same
> > ideas
> > >> could  be applied to joins etc, where we presently have many join
> > methods
> > >> and many overloads.
> > >>
> > >> Anyway, i look forward to hearing your opinions.
> > >>
> > >> Thanks,
> > >> Damian
> >
> >
>

Re: [DISCUSS] Streams DSL/StateStore Refactoring

Posted by Damian Guy <da...@gmail.com>.
Hi Jan,

Thanks for taking the time to put this together, appreciated. For the
benefit of others would you mind explaining a bit about your motivation?

Cheers,
Damian

On Wed, 2 Aug 2017 at 01:40 Jan Filipiak <Ja...@trivago.com> wrote:

> Hi all,
>
> after some further discussions, the best thing to show my Idea of how it
> should evolve would be a bigger mock/interface description.
> The goal is to reduce the store maintaining processors to only the
> Aggregators + and KTableSource. While having KTableSource optionally
> materialized.
>
> Introducing KTable:copy() will allow users to maintain state twice if
> they really want to. KStream::join*() wasn't touched. I never personally
> used that so I didn't feel
> comfortable enough touching it. Currently still making up my mind. None
> of the suggestions made it querieable so far. Gouzhangs 'Buffered' idea
> seems ideal here.
>
> please have a look. Looking forward for your opinions.
>
> Best Jan
>
>
>
> On 21.06.2017 17:24, Eno Thereska wrote:
> > (cc’ing user-list too)
> >
> > Given that we already have StateStoreSuppliers that are configurable
> using the fluent-like API, probably it’s worth discussing the other
> examples with joins and serdes first since those have many overloads and
> are in need of some TLC.
> >
> > So following your example, I guess you’d have something like:
> > .join()
> >     .withKeySerdes(…)
> >     .withValueSerdes(…)
> >     .withJoinType(“outer”)
> >
> > etc?
> >
> > I like the approach since it still remains declarative and it’d reduce
> the number of overloads by quite a bit.
> >
> > Eno
> >
> >> On Jun 21, 2017, at 3:37 PM, Damian Guy <da...@gmail.com> wrote:
> >>
> >> Hi,
> >>
> >> I'd like to get a discussion going around some of the API choices we've
> >> made in the DLS. In particular those that relate to stateful operations
> >> (though this could expand).
> >> As it stands we lean heavily on overloaded methods in the API, i.e,
> there
> >> are 9 overloads for KGroupedStream.count(..)! It is becoming noisy and i
> >> feel it is only going to get worse as we add more optional params. In
> >> particular we've had some requests to be able to turn caching off, or
> >> change log configs,  on a per operator basis (note this can be done now
> if
> >> you pass in a StateStoreSupplier, but this can be a bit cumbersome).
> >>
> >> So this is a bit of an open question. How can we change the DSL
> overloads
> >> so that it flows, is simple to use and understand, and is easily
> extended
> >> in the future?
> >>
> >> One option would be to use a fluent API approach for providing the
> optional
> >> params, so something like this:
> >>
> >> groupedStream.count()
> >>    .withStoreName("name")
> >>    .withCachingEnabled(false)
> >>    .withLoggingEnabled(config)
> >>    .table()
> >>
> >>
> >>
> >> Another option would be to provide a Builder to the count method, so it
> >> would look something like this:
> >> groupedStream.count(new
> >> CountBuilder("storeName").withCachingEnabled(false).build())
> >>
> >> Another option is to say: Hey we don't need this, what are you on about!
> >>
> >> The above has focussed on state store related overloads, but the same
> ideas
> >> could  be applied to joins etc, where we presently have many join
> methods
> >> and many overloads.
> >>
> >> Anyway, i look forward to hearing your opinions.
> >>
> >> Thanks,
> >> Damian
>
>