You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Matthias J. Sax" <ma...@confluent.io> on 2018/06/10 22:04:28 UTC

Re: [DISCUSSION] KIP-300: Add Windowed KTable API in StreamsBuilder

Thanks lot for the KIP. The general idea to allow reading
windowed-KTables is very useful! Couple of initial comments/question:



About only adding a single `windowedTable()` with no overloads:

 - retention time is no mandatory parameter and we can always use the
default of 1 day

 - instead of enforcing both, Consumed and Materialized, we could also
extend WindowedSerde to exposed its wrapped key-Serde; thus, if a user
passes in `WindowedSerde` the inner key-serde can be extracted and users
do not need to pass in the key-Serde explicitly.

 - while I agree that we need to pass in the window-size parameter, I am
not sure if using Materialized is the best way to do this; it seems that
window-size is the only mandatory parameter, thus we might be able to
pass it directly and thus allow to make `Consumed` and `Materialized`
optional. Something like:

> windowedTable(String topicName, long windowSizeMs);

As an (maybe better) alternative, we could also introduce a public
`Windowed` interface similar to `Produced`, `Consumed`, `Materialized`
etc that we us to pass in window parameters. Or maybe reuse the existing
`Windows` class (ie, the same definition that is used in
KGroupStream#windowedBy() can be passed into the new `windowedTable()`
method.



I also noticed, that the KIP only covers TimesWindows. Should we extend
it to cover SessionWindows, too?



> One side effect is that we bring `ChangeLoggingWindowBytesStore`
public for unit test purpose.

No need to mention this, because this class in in package "internals"
and not part of public API.




-Matthias



On 5/24/18 10:39 PM, Boyang Chen wrote:
> Hey friends,
> 
> 
> I know this is critical time for the 2.0 release. Just want to call out again for further review on the API format. Any feedback would be appreciated, thank you!
> 
> 
> Boyang
> 
> ________________________________
> From: Liquan Pei <li...@gmail.com>
> Sent: Tuesday, May 22, 2018 4:29 AM
> To: dev@kafka.apache.org
> Subject: Re: [DISCUSSION] KIP-300: Add Windowed KTable API in StreamsBuilder
> 
> This KIP makes sharing a WindowedKTable among Kafka Stream jobs very easy.
> It would be nice to get this into trunk soon.
> 
> Best,
> Liquan
> 
> On Mon, May 21, 2018 at 12:25 PM, Boyang Chen <bc...@outlook.com> wrote:
> 
>> Hey all,
>>
>>
>> I would like to start a discussion thread on KIP 300, which introduces a
>> new API called windowedTable() in StreamsBuilder:
>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>> 300%3A+Add+Windowed+KTable+API+in+StreamsBuilder
>>
>>
>> The pull request I'm working on is here: https://github.com/apache/
>> kafka/pull/5044
>>
>>
>> I understood that the community is busy working on 2.0 release, but this
>> KIP is really important for our internal use case. So if any of you got
>> time, please focus on clarifying the use case and reaching the agreement of
>> API. Really appreciate your time!
>>
>>
>> Best,
>>
>> Boyang
>>
>>
>>
> 
> 
> --
> Liquan Pei
> Software Engineer, Confluent Inc
> 


Re: [DISCUSSION] KIP-300: Add Windowed KTable API in StreamsBuilder

Posted by "Matthias J. Sax" <ma...@confluent.io>.
Boyang,

what is the status of this KIP?

-Matthias

On 6/17/18 9:21 PM, Guozhang Wang wrote:
> Thanks for the KIP Boyang, I made a pass over the KIP and the PR and have
> some comments:
> 
> 1. About the public API, I agree with Matthias that we can consider
> exposing the `innerDeserializer` and `innerSerializer` in the
> Time/SessionWindowSerializer/Deserializer, and the `innerSerde` in `
> WrapperSerde` so that users can still pass in a `WindowedSerde<T>` into
> Materialized and Consumed. So that we can have the public API as:
> 
> ```
> (final String topic, final Consumed<Windowed<K>, V> consumed, final
> Materialized<Windowed<K>, V, WindowStore<Bytes, byte[]>> materialized);
> 
> (final String topic, final Consumed<Windowed<K>, V> consumed);
> 
> (final String topic, final Materialized<Windowed<K>, V, WindowStore<Bytes,
> byte[]>> materialized);
> ```
> 
> 2. There is another WIP interface change to introduce a WindowedKTable<K,
> V> as an alias of KTable<Windowed<K>, V> for a different purpose of adding
> some functions only allowed for windowed table. I'm wondering with this
> interface class if we can work around the Java "method has same erasure"
> error with the same function name? This is just a wild thought, and I think
> if we ended up adding `Windowed` into the parameters it may not matter
> about the signature anyways.
> 
> 
> 3. This is just a question about your use case: it seems in your scenarios,
> you will materialize the window store twice in your topology: first time
> when you generate the windowed KTable from an windowed aggregation
> operator, the aggregation result i.e. the KTable<Windowed<K>, V> is already
> materialized into a store, and then when you pipe the changelog of this
> windowed KTable through an intermediate topic, and read from this topic to
> form a KTable<Windowed<K>, V>, you will materialize this store again, and
> the two materialized state stores will contain completely the same data.
> Have you thought about whether you really need to materialize it twice?
> 
> 
> Guozhang
> 
> 
> 
> On Sun, Jun 10, 2018 at 3:04 PM, Matthias J. Sax <ma...@confluent.io>
> wrote:
> 
>> Thanks lot for the KIP. The general idea to allow reading
>> windowed-KTables is very useful! Couple of initial comments/question:
>>
>>
>>
>> About only adding a single `windowedTable()` with no overloads:
>>
>>  - retention time is no mandatory parameter and we can always use the
>> default of 1 day
>>
>>  - instead of enforcing both, Consumed and Materialized, we could also
>> extend WindowedSerde to exposed its wrapped key-Serde; thus, if a user
>> passes in `WindowedSerde` the inner key-serde can be extracted and users
>> do not need to pass in the key-Serde explicitly.
>>
>>  - while I agree that we need to pass in the window-size parameter, I am
>> not sure if using Materialized is the best way to do this; it seems that
>> window-size is the only mandatory parameter, thus we might be able to
>> pass it directly and thus allow to make `Consumed` and `Materialized`
>> optional. Something like:
>>
>>> windowedTable(String topicName, long windowSizeMs);
>>
>> As an (maybe better) alternative, we could also introduce a public
>> `Windowed` interface similar to `Produced`, `Consumed`, `Materialized`
>> etc that we us to pass in window parameters. Or maybe reuse the existing
>> `Windows` class (ie, the same definition that is used in
>> KGroupStream#windowedBy() can be passed into the new `windowedTable()`
>> method.
>>
>>
>>
>> I also noticed, that the KIP only covers TimesWindows. Should we extend
>> it to cover SessionWindows, too?
>>
>>
>>
>>> One side effect is that we bring `ChangeLoggingWindowBytesStore`
>> public for unit test purpose.
>>
>> No need to mention this, because this class in in package "internals"
>> and not part of public API.
>>
>>
>>
>>
>> -Matthias
>>
>>
>>
>> On 5/24/18 10:39 PM, Boyang Chen wrote:
>>> Hey friends,
>>>
>>>
>>> I know this is critical time for the 2.0 release. Just want to call out
>> again for further review on the API format. Any feedback would be
>> appreciated, thank you!
>>>
>>>
>>> Boyang
>>>
>>> ________________________________
>>> From: Liquan Pei <li...@gmail.com>
>>> Sent: Tuesday, May 22, 2018 4:29 AM
>>> To: dev@kafka.apache.org
>>> Subject: Re: [DISCUSSION] KIP-300: Add Windowed KTable API in
>> StreamsBuilder
>>>
>>> This KIP makes sharing a WindowedKTable among Kafka Stream jobs very
>> easy.
>>> It would be nice to get this into trunk soon.
>>>
>>> Best,
>>> Liquan
>>>
>>> On Mon, May 21, 2018 at 12:25 PM, Boyang Chen <bc...@outlook.com>
>> wrote:
>>>
>>>> Hey all,
>>>>
>>>>
>>>> I would like to start a discussion thread on KIP 300, which introduces a
>>>> new API called windowedTable() in StreamsBuilder:
>>>>
>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>>> 300%3A+Add+Windowed+KTable+API+in+StreamsBuilder
>>>>
>>>>
>>>> The pull request I'm working on is here: https://github.com/apache/
>>>> kafka/pull/5044
>>>>
>>>>
>>>> I understood that the community is busy working on 2.0 release, but this
>>>> KIP is really important for our internal use case. So if any of you got
>>>> time, please focus on clarifying the use case and reaching the
>> agreement of
>>>> API. Really appreciate your time!
>>>>
>>>>
>>>> Best,
>>>>
>>>> Boyang
>>>>
>>>>
>>>>
>>>
>>>
>>> --
>>> Liquan Pei
>>> Software Engineer, Confluent Inc
>>>
>>
>>
> 
> 


Re: [DISCUSSION] KIP-300: Add Windowed KTable API in StreamsBuilder

Posted by Guozhang Wang <wa...@gmail.com>.
Thanks for the KIP Boyang, I made a pass over the KIP and the PR and have
some comments:

1. About the public API, I agree with Matthias that we can consider
exposing the `innerDeserializer` and `innerSerializer` in the
Time/SessionWindowSerializer/Deserializer, and the `innerSerde` in `
WrapperSerde` so that users can still pass in a `WindowedSerde<T>` into
Materialized and Consumed. So that we can have the public API as:

```
(final String topic, final Consumed<Windowed<K>, V> consumed, final
Materialized<Windowed<K>, V, WindowStore<Bytes, byte[]>> materialized);

(final String topic, final Consumed<Windowed<K>, V> consumed);

(final String topic, final Materialized<Windowed<K>, V, WindowStore<Bytes,
byte[]>> materialized);
```

2. There is another WIP interface change to introduce a WindowedKTable<K,
V> as an alias of KTable<Windowed<K>, V> for a different purpose of adding
some functions only allowed for windowed table. I'm wondering with this
interface class if we can work around the Java "method has same erasure"
error with the same function name? This is just a wild thought, and I think
if we ended up adding `Windowed` into the parameters it may not matter
about the signature anyways.


3. This is just a question about your use case: it seems in your scenarios,
you will materialize the window store twice in your topology: first time
when you generate the windowed KTable from an windowed aggregation
operator, the aggregation result i.e. the KTable<Windowed<K>, V> is already
materialized into a store, and then when you pipe the changelog of this
windowed KTable through an intermediate topic, and read from this topic to
form a KTable<Windowed<K>, V>, you will materialize this store again, and
the two materialized state stores will contain completely the same data.
Have you thought about whether you really need to materialize it twice?


Guozhang



On Sun, Jun 10, 2018 at 3:04 PM, Matthias J. Sax <ma...@confluent.io>
wrote:

> Thanks lot for the KIP. The general idea to allow reading
> windowed-KTables is very useful! Couple of initial comments/question:
>
>
>
> About only adding a single `windowedTable()` with no overloads:
>
>  - retention time is no mandatory parameter and we can always use the
> default of 1 day
>
>  - instead of enforcing both, Consumed and Materialized, we could also
> extend WindowedSerde to exposed its wrapped key-Serde; thus, if a user
> passes in `WindowedSerde` the inner key-serde can be extracted and users
> do not need to pass in the key-Serde explicitly.
>
>  - while I agree that we need to pass in the window-size parameter, I am
> not sure if using Materialized is the best way to do this; it seems that
> window-size is the only mandatory parameter, thus we might be able to
> pass it directly and thus allow to make `Consumed` and `Materialized`
> optional. Something like:
>
> > windowedTable(String topicName, long windowSizeMs);
>
> As an (maybe better) alternative, we could also introduce a public
> `Windowed` interface similar to `Produced`, `Consumed`, `Materialized`
> etc that we us to pass in window parameters. Or maybe reuse the existing
> `Windows` class (ie, the same definition that is used in
> KGroupStream#windowedBy() can be passed into the new `windowedTable()`
> method.
>
>
>
> I also noticed, that the KIP only covers TimesWindows. Should we extend
> it to cover SessionWindows, too?
>
>
>
> > One side effect is that we bring `ChangeLoggingWindowBytesStore`
> public for unit test purpose.
>
> No need to mention this, because this class in in package "internals"
> and not part of public API.
>
>
>
>
> -Matthias
>
>
>
> On 5/24/18 10:39 PM, Boyang Chen wrote:
> > Hey friends,
> >
> >
> > I know this is critical time for the 2.0 release. Just want to call out
> again for further review on the API format. Any feedback would be
> appreciated, thank you!
> >
> >
> > Boyang
> >
> > ________________________________
> > From: Liquan Pei <li...@gmail.com>
> > Sent: Tuesday, May 22, 2018 4:29 AM
> > To: dev@kafka.apache.org
> > Subject: Re: [DISCUSSION] KIP-300: Add Windowed KTable API in
> StreamsBuilder
> >
> > This KIP makes sharing a WindowedKTable among Kafka Stream jobs very
> easy.
> > It would be nice to get this into trunk soon.
> >
> > Best,
> > Liquan
> >
> > On Mon, May 21, 2018 at 12:25 PM, Boyang Chen <bc...@outlook.com>
> wrote:
> >
> >> Hey all,
> >>
> >>
> >> I would like to start a discussion thread on KIP 300, which introduces a
> >> new API called windowedTable() in StreamsBuilder:
> >>
> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >> 300%3A+Add+Windowed+KTable+API+in+StreamsBuilder
> >>
> >>
> >> The pull request I'm working on is here: https://github.com/apache/
> >> kafka/pull/5044
> >>
> >>
> >> I understood that the community is busy working on 2.0 release, but this
> >> KIP is really important for our internal use case. So if any of you got
> >> time, please focus on clarifying the use case and reaching the
> agreement of
> >> API. Really appreciate your time!
> >>
> >>
> >> Best,
> >>
> >> Boyang
> >>
> >>
> >>
> >
> >
> > --
> > Liquan Pei
> > Software Engineer, Confluent Inc
> >
>
>


-- 
-- Guozhang