You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Adam Bellemare <ad...@gmail.com> on 2018/06/18 20:15:05 UTC

[DISCUSS] - KIP-314: KTable to GlobalKTable Bi-directional Join

Hi All

I created KIP-314 and I would like to initiate a discussion on it.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-314%3A+KTable+to+GlobalKTable+Bi-directional+Join

The primary goal of this KIP is to improve the way that Kafka can deal with
relational data at scale. This KIP would alter the way that GlobalKTables
can be used in relation to KTables. I believe that this would be a very
useful change but I need some eyes on the technical aspects to validate or
refute the strategy.

Thanks

Adam Bellemare

Re: [DISCUSS] - KIP-314: KTable to GlobalKTable Bi-directional Join

Posted by "Matthias J. Sax" <ma...@confluent.io>.
Moved this KIP into status "inactive". Feel free to resume and any time.

-Matthias

On 7/1/18 9:47 PM, Guozhang Wang wrote:
> Hello Adam,
> 
> Sorry for being late on this thread. I've read through your updated wiki
> and here are some thoughts:
> 
> * I agree with your assessed impediments. In fact, today although the
> Global KTable have its own checkpoint files, and restoration process,
> during its restoration it will always try to bootstrap the backing global
> store up to the Kafka topic's end of offset, before starting any
> processing. And hence on each machine node the checkpoint offsets  For the
> proposed change that Global KTable also needs to trigger joins, it means
> its restoration process would not fit as well. On the other hand, even for
> Global KTable - Stream joins we do not have such guarantees either: imagine
> if there is a crash, or even graceful shutdown scenario, when the task is
> back online and Global KTable is bootstrapped, it does not guarantee to be
> at the same offset position when it has stopped anyways. So I think one can
> argue that users of Global KTable - KTable join should not expect this
> semantics either.
> 
> * The other issue, which I mentioned above, is that the updates of the
> joins triggered by the Global KTable, and hence executed by the global
> thread, now needs to be propagated into the downstream operators, and more
> important following the order of the join: i.e. if there is a record coming
> from Global KTable, and then later another record coming from the other
> KTable. It means that then the global thread and the stream thread needs to
> be synchronized (note that today these threads are totally in parallel to
> each other).
> 
> 
> With that, I think we can 1) continue working on KIP-213 for local KTable
> joins, and 2) continue this KIP for Global KTable - KTable joins, while
> educating users that similar to Global KTable - KStream joins, the global
> ktable will not Global KTable trigger joins.
> 
> Guozhang
> 
> 
> On Mon, Jun 25, 2018 at 11:05 AM, Adam Bellemare <ad...@gmail.com>
> wrote:
> 
>> Thanks for your help so far guys.
>>
>> While I do think that I have a fairly reasonable way forward for
>> restructuring the topologies and threads, there is, unfortunately, what I
>> believe is a fatal flaw that cannot be easily resolved. I have updated the
>> page (
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>> 314%3A+KTable+to+GlobalKTable+Bi-directional+Join
>> ) with the impediments to the solution, all of which revolve around
>> ensuring that data consistency is maintained. It seems to me that
>> GlobalKTables are not the way forward here and that I may be best
>> redirecting my efforts towards KIP-213 ( https://cwiki.apache.org/
>> confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable ).
>>
>> I would appreciate being proven wrong on my impediment listings, but if
>> there are no other ideas I think we should close this KIP and the
>> associated JIRA. A KTable to GlobalKTable join driven just by the KTable is
>> simply performed by a stream to GKT join with a groupbyKey and reduce to
>> form a state-store, so I would see no need to keep it open otherwise
>> (unless just for the shorthand notation).
>>
>> Thanks again
>>
>> Adam
>>
>> On Fri, Jun 22, 2018 at 9:00 PM, Guozhang Wang <wa...@gmail.com> wrote:
>>
>>> Hello Adam,
>>>
>>> Please see my comments inline.
>>>
>>> On Thu, Jun 21, 2018 at 8:14 AM, Adam Bellemare <
>> adam.bellemare@gmail.com>
>>> wrote:
>>>
>>>> Hi Guozhang
>>>>
>>>> *Re: Questions*
>>>> *1)* I do not yet have a solution to this, but I also did not look that
>>>> closely at it when I begun this KIP. I admit that I was unaware of
>>> exactly
>>>> how the GlobalKTable worked alongside the KTable/KStream topologies.
>> You
>>>> mention "It means the two topologies will be merged, and that merged
>>>> topology can only be executed as a single task, by a single thread. " -
>>> is
>>>> the problem here that the merged topology would be parallelized to
>> other
>>>> threads/instances? While I am becoming familiar with how the topologies
>>> are
>>>> created under the hood, I am not yet fully clear on the implications of
>>>> your statement. I will look into this further.
>>>>
>>>>
>>> Yes. The issue is that today each task is executed by a single thread
>> only
>>> at any given time, and hence any state stores are only accessed by a
>> single
>>> thread (except for interactive queries, and for global tables where the
>>> global update thread write to the global store, and the local thread read
>>> from the global store), if we let the global store update thread to be
>> also
>>> triggering joins and puts send the results into the downstream operators,
>>> then it means that the global store update thread can access on any state
>>> stores in the subsequent part of the topology, breaking our current
>>> threading model.
>>>
>>>
>>>> *2)* " do you mean that although we have a duplicated state store:
>>>> ModifiedEvents in addition to the original Events with only the
>> enhanced
>>>> key, this is not avoidable anyways even if we do re-keying?" Yes, that
>> is
>>>> correct, that is what I meant. I need to improve my knowledge around
>> this
>>>> component too. I have been browsing the KIP-213 discussion thread and
>>>> looking at Jan's code
>>>>
>>>> *Re: Comments*
>>>> *1) *Makes sense. I will update the diagram accordingly. Thanks!
>>>>
>>>> *2)* Wouldn't outer join require that we emit records from the right
>>>> GlobalKTable that have no match in the left KTable? This seems
>> undefined
>>> to
>>>> me with the current proposal (above issues aside), since multiple
>> threads
>>>> would be producing the same output event for a single GlobalKTable
>>> update.
>>>>
>>>>
>>> I was considering mainly about the semantics of table-table joins, that
>>> whether we should add this operator inside our API. Implementation wise,
>> we
>>> will only have one global store update thread per instance, so there will
>>> not be multiple threads producing the same output, but still there would
>> be
>>> other issues that we should consider indeed, as mentioned above. Again
>> this
>>> comment is not about implementations, but API wise if it is desirable to
>>> add it.
>>>
>>>
>>>>
>>>> Questions for you both:
>>>> Q1) Is a KTable always materialized? I am looking at the code under the
>>>> hood, and it seems to me that it's either materialized with an explicit
>>>> Materialized object, or it's given an anonymous name and the default
>>> serdes
>>>> are used. Am I correct in this observation?
>>>>
>>>>
>>> A KTable is not always materialized. For example, a KTable generated from
>>> `KTable#filter` or `KTable#mapValues` does not create a new materialized
>>> state store, but we use the caller `KTable` 's state store for anyone who
>>> wants to query it in joins.
>>>
>>> Moving forward, we are also trying to optimize the topology to only
>>> "logically" materialize a KTable when necessary, this is summarized in
>>> https://issues.apache.org/jira/browse/KAFKA-6761
>>>
>>>
>>>>
>>>> Thanks,
>>>> Adam
>>>>
>>>>
>>>
>>> --
>>> -- Guozhang
>>>
>>
> 
> 
> 


Re: [DISCUSS] - KIP-314: KTable to GlobalKTable Bi-directional Join

Posted by Guozhang Wang <wa...@gmail.com>.
Hello Adam,

Sorry for being late on this thread. I've read through your updated wiki
and here are some thoughts:

* I agree with your assessed impediments. In fact, today although the
Global KTable have its own checkpoint files, and restoration process,
during its restoration it will always try to bootstrap the backing global
store up to the Kafka topic's end of offset, before starting any
processing. And hence on each machine node the checkpoint offsets  For the
proposed change that Global KTable also needs to trigger joins, it means
its restoration process would not fit as well. On the other hand, even for
Global KTable - Stream joins we do not have such guarantees either: imagine
if there is a crash, or even graceful shutdown scenario, when the task is
back online and Global KTable is bootstrapped, it does not guarantee to be
at the same offset position when it has stopped anyways. So I think one can
argue that users of Global KTable - KTable join should not expect this
semantics either.

* The other issue, which I mentioned above, is that the updates of the
joins triggered by the Global KTable, and hence executed by the global
thread, now needs to be propagated into the downstream operators, and more
important following the order of the join: i.e. if there is a record coming
from Global KTable, and then later another record coming from the other
KTable. It means that then the global thread and the stream thread needs to
be synchronized (note that today these threads are totally in parallel to
each other).


With that, I think we can 1) continue working on KIP-213 for local KTable
joins, and 2) continue this KIP for Global KTable - KTable joins, while
educating users that similar to Global KTable - KStream joins, the global
ktable will not Global KTable trigger joins.

Guozhang


On Mon, Jun 25, 2018 at 11:05 AM, Adam Bellemare <ad...@gmail.com>
wrote:

> Thanks for your help so far guys.
>
> While I do think that I have a fairly reasonable way forward for
> restructuring the topologies and threads, there is, unfortunately, what I
> believe is a fatal flaw that cannot be easily resolved. I have updated the
> page (
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 314%3A+KTable+to+GlobalKTable+Bi-directional+Join
> ) with the impediments to the solution, all of which revolve around
> ensuring that data consistency is maintained. It seems to me that
> GlobalKTables are not the way forward here and that I may be best
> redirecting my efforts towards KIP-213 ( https://cwiki.apache.org/
> confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable ).
>
> I would appreciate being proven wrong on my impediment listings, but if
> there are no other ideas I think we should close this KIP and the
> associated JIRA. A KTable to GlobalKTable join driven just by the KTable is
> simply performed by a stream to GKT join with a groupbyKey and reduce to
> form a state-store, so I would see no need to keep it open otherwise
> (unless just for the shorthand notation).
>
> Thanks again
>
> Adam
>
> On Fri, Jun 22, 2018 at 9:00 PM, Guozhang Wang <wa...@gmail.com> wrote:
>
> > Hello Adam,
> >
> > Please see my comments inline.
> >
> > On Thu, Jun 21, 2018 at 8:14 AM, Adam Bellemare <
> adam.bellemare@gmail.com>
> > wrote:
> >
> > > Hi Guozhang
> > >
> > > *Re: Questions*
> > > *1)* I do not yet have a solution to this, but I also did not look that
> > > closely at it when I begun this KIP. I admit that I was unaware of
> > exactly
> > > how the GlobalKTable worked alongside the KTable/KStream topologies.
> You
> > > mention "It means the two topologies will be merged, and that merged
> > > topology can only be executed as a single task, by a single thread. " -
> > is
> > > the problem here that the merged topology would be parallelized to
> other
> > > threads/instances? While I am becoming familiar with how the topologies
> > are
> > > created under the hood, I am not yet fully clear on the implications of
> > > your statement. I will look into this further.
> > >
> > >
> > Yes. The issue is that today each task is executed by a single thread
> only
> > at any given time, and hence any state stores are only accessed by a
> single
> > thread (except for interactive queries, and for global tables where the
> > global update thread write to the global store, and the local thread read
> > from the global store), if we let the global store update thread to be
> also
> > triggering joins and puts send the results into the downstream operators,
> > then it means that the global store update thread can access on any state
> > stores in the subsequent part of the topology, breaking our current
> > threading model.
> >
> >
> > > *2)* " do you mean that although we have a duplicated state store:
> > > ModifiedEvents in addition to the original Events with only the
> enhanced
> > > key, this is not avoidable anyways even if we do re-keying?" Yes, that
> is
> > > correct, that is what I meant. I need to improve my knowledge around
> this
> > > component too. I have been browsing the KIP-213 discussion thread and
> > > looking at Jan's code
> > >
> > > *Re: Comments*
> > > *1) *Makes sense. I will update the diagram accordingly. Thanks!
> > >
> > > *2)* Wouldn't outer join require that we emit records from the right
> > > GlobalKTable that have no match in the left KTable? This seems
> undefined
> > to
> > > me with the current proposal (above issues aside), since multiple
> threads
> > > would be producing the same output event for a single GlobalKTable
> > update.
> > >
> > >
> > I was considering mainly about the semantics of table-table joins, that
> > whether we should add this operator inside our API. Implementation wise,
> we
> > will only have one global store update thread per instance, so there will
> > not be multiple threads producing the same output, but still there would
> be
> > other issues that we should consider indeed, as mentioned above. Again
> this
> > comment is not about implementations, but API wise if it is desirable to
> > add it.
> >
> >
> > >
> > > Questions for you both:
> > > Q1) Is a KTable always materialized? I am looking at the code under the
> > > hood, and it seems to me that it's either materialized with an explicit
> > > Materialized object, or it's given an anonymous name and the default
> > serdes
> > > are used. Am I correct in this observation?
> > >
> > >
> > A KTable is not always materialized. For example, a KTable generated from
> > `KTable#filter` or `KTable#mapValues` does not create a new materialized
> > state store, but we use the caller `KTable` 's state store for anyone who
> > wants to query it in joins.
> >
> > Moving forward, we are also trying to optimize the topology to only
> > "logically" materialize a KTable when necessary, this is summarized in
> > https://issues.apache.org/jira/browse/KAFKA-6761
> >
> >
> > >
> > > Thanks,
> > > Adam
> > >
> > >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang

Re: [DISCUSS] - KIP-314: KTable to GlobalKTable Bi-directional Join

Posted by Adam Bellemare <ad...@gmail.com>.
Thanks for your help so far guys.

While I do think that I have a fairly reasonable way forward for
restructuring the topologies and threads, there is, unfortunately, what I
believe is a fatal flaw that cannot be easily resolved. I have updated the
page (
https://cwiki.apache.org/confluence/display/KAFKA/KIP-314%3A+KTable+to+GlobalKTable+Bi-directional+Join
) with the impediments to the solution, all of which revolve around
ensuring that data consistency is maintained. It seems to me that
GlobalKTables are not the way forward here and that I may be best
redirecting my efforts towards KIP-213 ( https://cwiki.apache.org/
confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable ).

I would appreciate being proven wrong on my impediment listings, but if
there are no other ideas I think we should close this KIP and the
associated JIRA. A KTable to GlobalKTable join driven just by the KTable is
simply performed by a stream to GKT join with a groupbyKey and reduce to
form a state-store, so I would see no need to keep it open otherwise
(unless just for the shorthand notation).

Thanks again

Adam

On Fri, Jun 22, 2018 at 9:00 PM, Guozhang Wang <wa...@gmail.com> wrote:

> Hello Adam,
>
> Please see my comments inline.
>
> On Thu, Jun 21, 2018 at 8:14 AM, Adam Bellemare <ad...@gmail.com>
> wrote:
>
> > Hi Guozhang
> >
> > *Re: Questions*
> > *1)* I do not yet have a solution to this, but I also did not look that
> > closely at it when I begun this KIP. I admit that I was unaware of
> exactly
> > how the GlobalKTable worked alongside the KTable/KStream topologies. You
> > mention "It means the two topologies will be merged, and that merged
> > topology can only be executed as a single task, by a single thread. " -
> is
> > the problem here that the merged topology would be parallelized to other
> > threads/instances? While I am becoming familiar with how the topologies
> are
> > created under the hood, I am not yet fully clear on the implications of
> > your statement. I will look into this further.
> >
> >
> Yes. The issue is that today each task is executed by a single thread only
> at any given time, and hence any state stores are only accessed by a single
> thread (except for interactive queries, and for global tables where the
> global update thread write to the global store, and the local thread read
> from the global store), if we let the global store update thread to be also
> triggering joins and puts send the results into the downstream operators,
> then it means that the global store update thread can access on any state
> stores in the subsequent part of the topology, breaking our current
> threading model.
>
>
> > *2)* " do you mean that although we have a duplicated state store:
> > ModifiedEvents in addition to the original Events with only the enhanced
> > key, this is not avoidable anyways even if we do re-keying?" Yes, that is
> > correct, that is what I meant. I need to improve my knowledge around this
> > component too. I have been browsing the KIP-213 discussion thread and
> > looking at Jan's code
> >
> > *Re: Comments*
> > *1) *Makes sense. I will update the diagram accordingly. Thanks!
> >
> > *2)* Wouldn't outer join require that we emit records from the right
> > GlobalKTable that have no match in the left KTable? This seems undefined
> to
> > me with the current proposal (above issues aside), since multiple threads
> > would be producing the same output event for a single GlobalKTable
> update.
> >
> >
> I was considering mainly about the semantics of table-table joins, that
> whether we should add this operator inside our API. Implementation wise, we
> will only have one global store update thread per instance, so there will
> not be multiple threads producing the same output, but still there would be
> other issues that we should consider indeed, as mentioned above. Again this
> comment is not about implementations, but API wise if it is desirable to
> add it.
>
>
> >
> > Questions for you both:
> > Q1) Is a KTable always materialized? I am looking at the code under the
> > hood, and it seems to me that it's either materialized with an explicit
> > Materialized object, or it's given an anonymous name and the default
> serdes
> > are used. Am I correct in this observation?
> >
> >
> A KTable is not always materialized. For example, a KTable generated from
> `KTable#filter` or `KTable#mapValues` does not create a new materialized
> state store, but we use the caller `KTable` 's state store for anyone who
> wants to query it in joins.
>
> Moving forward, we are also trying to optimize the topology to only
> "logically" materialize a KTable when necessary, this is summarized in
> https://issues.apache.org/jira/browse/KAFKA-6761
>
>
> >
> > Thanks,
> > Adam
> >
> >
>
> --
> -- Guozhang
>

Re: [DISCUSS] - KIP-314: KTable to GlobalKTable Bi-directional Join

Posted by Guozhang Wang <wa...@gmail.com>.
Hello Adam,

Please see my comments inline.

On Thu, Jun 21, 2018 at 8:14 AM, Adam Bellemare <ad...@gmail.com>
wrote:

> Hi Guozhang
>
> *Re: Questions*
> *1)* I do not yet have a solution to this, but I also did not look that
> closely at it when I begun this KIP. I admit that I was unaware of exactly
> how the GlobalKTable worked alongside the KTable/KStream topologies. You
> mention "It means the two topologies will be merged, and that merged
> topology can only be executed as a single task, by a single thread. " - is
> the problem here that the merged topology would be parallelized to other
> threads/instances? While I am becoming familiar with how the topologies are
> created under the hood, I am not yet fully clear on the implications of
> your statement. I will look into this further.
>
>
Yes. The issue is that today each task is executed by a single thread only
at any given time, and hence any state stores are only accessed by a single
thread (except for interactive queries, and for global tables where the
global update thread write to the global store, and the local thread read
from the global store), if we let the global store update thread to be also
triggering joins and puts send the results into the downstream operators,
then it means that the global store update thread can access on any state
stores in the subsequent part of the topology, breaking our current
threading model.


> *2)* " do you mean that although we have a duplicated state store:
> ModifiedEvents in addition to the original Events with only the enhanced
> key, this is not avoidable anyways even if we do re-keying?" Yes, that is
> correct, that is what I meant. I need to improve my knowledge around this
> component too. I have been browsing the KIP-213 discussion thread and
> looking at Jan's code
>
> *Re: Comments*
> *1) *Makes sense. I will update the diagram accordingly. Thanks!
>
> *2)* Wouldn't outer join require that we emit records from the right
> GlobalKTable that have no match in the left KTable? This seems undefined to
> me with the current proposal (above issues aside), since multiple threads
> would be producing the same output event for a single GlobalKTable update.
>
>
I was considering mainly about the semantics of table-table joins, that
whether we should add this operator inside our API. Implementation wise, we
will only have one global store update thread per instance, so there will
not be multiple threads producing the same output, but still there would be
other issues that we should consider indeed, as mentioned above. Again this
comment is not about implementations, but API wise if it is desirable to
add it.


>
> Questions for you both:
> Q1) Is a KTable always materialized? I am looking at the code under the
> hood, and it seems to me that it's either materialized with an explicit
> Materialized object, or it's given an anonymous name and the default serdes
> are used. Am I correct in this observation?
>
>
A KTable is not always materialized. For example, a KTable generated from
`KTable#filter` or `KTable#mapValues` does not create a new materialized
state store, but we use the caller `KTable` 's state store for anyone who
wants to query it in joins.

Moving forward, we are also trying to optimize the topology to only
"logically" materialize a KTable when necessary, this is summarized in
https://issues.apache.org/jira/browse/KAFKA-6761


>
> Thanks,
> Adam
>
>

-- 
-- Guozhang

Re: [DISCUSS] - KIP-314: KTable to GlobalKTable Bi-directional Join

Posted by Adam Bellemare <ad...@gmail.com>.
Hi Guozhang

*Re: Questions*
*1)* I do not yet have a solution to this, but I also did not look that
closely at it when I begun this KIP. I admit that I was unaware of exactly
how the GlobalKTable worked alongside the KTable/KStream topologies. You
mention "It means the two topologies will be merged, and that merged
topology can only be executed as a single task, by a single thread. " - is
the problem here that the merged topology would be parallelized to other
threads/instances? While I am becoming familiar with how the topologies are
created under the hood, I am not yet fully clear on the implications of
your statement. I will look into this further.

*2)* " do you mean that although we have a duplicated state store:
ModifiedEvents in addition to the original Events with only the enhanced
key, this is not avoidable anyways even if we do re-keying?" Yes, that is
correct, that is what I meant. I need to improve my knowledge around this
component too. I have been browsing the KIP-213 discussion thread and
looking at Jan's code

*Re: Comments*
*1) *Makes sense. I will update the diagram accordingly. Thanks!

*2)* Wouldn't outer join require that we emit records from the right
GlobalKTable that have no match in the left KTable? This seems undefined to
me with the current proposal (above issues aside), since multiple threads
would be producing the same output event for a single GlobalKTable update.


Questions for you both:
Q1) Is a KTable always materialized? I am looking at the code under the
hood, and it seems to me that it's either materialized with an explicit
Materialized object, or it's given an anonymous name and the default serdes
are used. Am I correct in this observation?


Thanks,
Adam



On Wed, Jun 20, 2018 at 6:44 PM, Guozhang Wang <wa...@gmail.com> wrote:

> Hello Adam,
>
> Thanks for proposing the KIP. A few meta comments:
>
> 1. As Matthias mentioned, the current GlobalKTable is designed to be
> read-only, and not driving any computations (btw the global store backing a
> GlobalKTable should also be read-only). Behind the scene the global store
> updating task and the regular streams task are two separate ones running
> two separate processor topologies by two threads: the global store updating
> task's topology is simply a source node, plus a processor node (let's call
> it the update-processor) that puts to the store. If we allow the
> GlobalKTable to drive the join, then we need the underlying global store's
> update processor to link to the downstream processors of the normal regular
> task's topology in order to pass the joined results to downstream. It means
> the two topologies will be merged, and that merged topology can only be
> executed as a single task, by a single thread. We need to think of a way
> how to work around this issue first of all before proceeding to next steps.
>
> 2. Not clear what do you mean by "In terms of data complexity, any pattern
> that requires us to rekey the data once is equivalent in terms of data
> capacity requirements.." do you mean that although we have a duplicated
> state store: ModifiedEvents in addition to the original Events with only
> the enhanced key, this is not avoidable anyways even if we do re-keying?
> Note that in
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 213+Support+non-key+joining+in+KTable?preview=/74684836/
> 74687529/Screenshot%20from%202017-11-18%2023%3A26%3A52.png
> we were considering if it is still possible to only materialize the joining
> tables once each still, i.e. not having a duplicated store. So I think it
> is not necessarily the case that we have to duplicate the KTable's store.
>
>
> One minor comment:
>
> 1. In `*KTable as Driver, joined on GlobalKTable join mechanism`* section,
> I think we still need to join the old value with the global store to form a
> pair of "<new / old>" joined result, so that the resulting KTable can still
> be applied in another aggregation operator that allows correct addition /
> subtraction logic.
>
> 2. For KTable-KTable join, we have inner / left / outer, while for
> KStream-KTable / GlobalKTable join we only have inner / left, and the
> reason is that for stream-table joins outer join makes less sense; should
> we consider outer for KTable-GlobalKTable join as well?
>
>
> Guozhang
>
>
> On Tue, Jun 19, 2018 at 10:27 AM, Adam Bellemare <adam.bellemare@gmail.com
> >
> wrote:
>
> > Matthias
> >
> > Thanks for the links. I have seen those before but I will dig deeper into
> > them, especially around the CombinedKey and the flush + cache + rangescan
> > functionality. I believe Jan had a PR with many of the changes in there,
> > perhaps I can use some of the work that was done there to help leverage a
> > similar (or identical) design.
> >
> > I will certainly be able to make a PoC before going to vote on this one.
> It
> > is a larger change and I suspect that we will need to review some of the
> > finer points to ensure that the design is still suitable and sufficiently
> > performant. I'll post back when I have something more concrete, but in
> the
> > meantime I welcome all other concerns and comments.
> >
> > Thanks
> >
> >
> >
> > On Mon, Jun 18, 2018 at 10:05 PM, Matthias J. Sax <matthias@confluent.io
> >
> > wrote:
> >
> > > Adam,
> > >
> > > thanks a lot for the KIP. I agree that this would be a valuable feature
> > > to add. It's a very complex one though. You correctly pointed out, that
> > > the GlobalKTable (or global stores in general) cannot be the "driver"
> > > atm and are passively updated only. This is by design. Are you familiar
> > > with the KIP discussion of KIP-99?
> > > (https://cwiki.apache.org/confluence/pages/viewpage.
> > action?pageId=67633649
> > > )
> > > Would be worth to refresh to understand the tradeoffs and design
> > decisions.
> > >
> > > It's unclear to me, what the impact will be if we want to change the
> > > current design. Even if no GlobalKTable is used, it might have impact
> on
> > > performance and for sure on code complexity. Overall, it seems that a
> > > POC might be required before we can consider adding this (with the
> > > danger, that it does not get accepted in the end).
> > >
> > > Are you aware of KIP-213:
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > 213+Support+non-key+joining+in+KTable
> > >
> > > It suggest to add non-key joins and a lot of issues how to implement
> > > this were discussed already. As a KTable-GloblKTable join is a non-key
> > > join, too, it seems that those discussion apply to your KIP too.
> > >
> > > Hope this helps to make the next steps.
> > >
> > >
> > > -Matthias
> > >
> > >
> > > On 6/18/18 1:15 PM, Adam Bellemare wrote:
> > > > Hi All
> > > >
> > > > I created KIP-314 and I would like to initiate a discussion on it.
> > > >
> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > 314%3A+KTable+to+GlobalKTable+Bi-directional+Join
> > > >
> > > > The primary goal of this KIP is to improve the way that Kafka can
> deal
> > > with
> > > > relational data at scale. This KIP would alter the way that
> > GlobalKTables
> > > > can be used in relation to KTables. I believe that this would be a
> very
> > > > useful change but I need some eyes on the technical aspects to
> validate
> > > or
> > > > refute the strategy.
> > > >
> > > > Thanks
> > > >
> > > > Adam Bellemare
> > > >
> > >
> > >
> >
>
>
>
> --
> -- Guozhang
>

Re: [DISCUSS] - KIP-314: KTable to GlobalKTable Bi-directional Join

Posted by Guozhang Wang <wa...@gmail.com>.
Hello Adam,

Thanks for proposing the KIP. A few meta comments:

1. As Matthias mentioned, the current GlobalKTable is designed to be
read-only, and not driving any computations (btw the global store backing a
GlobalKTable should also be read-only). Behind the scene the global store
updating task and the regular streams task are two separate ones running
two separate processor topologies by two threads: the global store updating
task's topology is simply a source node, plus a processor node (let's call
it the update-processor) that puts to the store. If we allow the
GlobalKTable to drive the join, then we need the underlying global store's
update processor to link to the downstream processors of the normal regular
task's topology in order to pass the joined results to downstream. It means
the two topologies will be merged, and that merged topology can only be
executed as a single task, by a single thread. We need to think of a way
how to work around this issue first of all before proceeding to next steps.

2. Not clear what do you mean by "In terms of data complexity, any pattern
that requires us to rekey the data once is equivalent in terms of data
capacity requirements.." do you mean that although we have a duplicated
state store: ModifiedEvents in addition to the original Events with only
the enhanced key, this is not avoidable anyways even if we do re-keying?
Note that in
https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable?preview=/74684836/74687529/Screenshot%20from%202017-11-18%2023%3A26%3A52.png
we were considering if it is still possible to only materialize the joining
tables once each still, i.e. not having a duplicated store. So I think it
is not necessarily the case that we have to duplicate the KTable's store.


One minor comment:

1. In `*KTable as Driver, joined on GlobalKTable join mechanism`* section,
I think we still need to join the old value with the global store to form a
pair of "<new / old>" joined result, so that the resulting KTable can still
be applied in another aggregation operator that allows correct addition /
subtraction logic.

2. For KTable-KTable join, we have inner / left / outer, while for
KStream-KTable / GlobalKTable join we only have inner / left, and the
reason is that for stream-table joins outer join makes less sense; should
we consider outer for KTable-GlobalKTable join as well?


Guozhang


On Tue, Jun 19, 2018 at 10:27 AM, Adam Bellemare <ad...@gmail.com>
wrote:

> Matthias
>
> Thanks for the links. I have seen those before but I will dig deeper into
> them, especially around the CombinedKey and the flush + cache + rangescan
> functionality. I believe Jan had a PR with many of the changes in there,
> perhaps I can use some of the work that was done there to help leverage a
> similar (or identical) design.
>
> I will certainly be able to make a PoC before going to vote on this one. It
> is a larger change and I suspect that we will need to review some of the
> finer points to ensure that the design is still suitable and sufficiently
> performant. I'll post back when I have something more concrete, but in the
> meantime I welcome all other concerns and comments.
>
> Thanks
>
>
>
> On Mon, Jun 18, 2018 at 10:05 PM, Matthias J. Sax <ma...@confluent.io>
> wrote:
>
> > Adam,
> >
> > thanks a lot for the KIP. I agree that this would be a valuable feature
> > to add. It's a very complex one though. You correctly pointed out, that
> > the GlobalKTable (or global stores in general) cannot be the "driver"
> > atm and are passively updated only. This is by design. Are you familiar
> > with the KIP discussion of KIP-99?
> > (https://cwiki.apache.org/confluence/pages/viewpage.
> action?pageId=67633649
> > )
> > Would be worth to refresh to understand the tradeoffs and design
> decisions.
> >
> > It's unclear to me, what the impact will be if we want to change the
> > current design. Even if no GlobalKTable is used, it might have impact on
> > performance and for sure on code complexity. Overall, it seems that a
> > POC might be required before we can consider adding this (with the
> > danger, that it does not get accepted in the end).
> >
> > Are you aware of KIP-213:
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > 213+Support+non-key+joining+in+KTable
> >
> > It suggest to add non-key joins and a lot of issues how to implement
> > this were discussed already. As a KTable-GloblKTable join is a non-key
> > join, too, it seems that those discussion apply to your KIP too.
> >
> > Hope this helps to make the next steps.
> >
> >
> > -Matthias
> >
> >
> > On 6/18/18 1:15 PM, Adam Bellemare wrote:
> > > Hi All
> > >
> > > I created KIP-314 and I would like to initiate a discussion on it.
> > >
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > 314%3A+KTable+to+GlobalKTable+Bi-directional+Join
> > >
> > > The primary goal of this KIP is to improve the way that Kafka can deal
> > with
> > > relational data at scale. This KIP would alter the way that
> GlobalKTables
> > > can be used in relation to KTables. I believe that this would be a very
> > > useful change but I need some eyes on the technical aspects to validate
> > or
> > > refute the strategy.
> > >
> > > Thanks
> > >
> > > Adam Bellemare
> > >
> >
> >
>



-- 
-- Guozhang

Re: [DISCUSS] - KIP-314: KTable to GlobalKTable Bi-directional Join

Posted by Adam Bellemare <ad...@gmail.com>.
Matthias

Thanks for the links. I have seen those before but I will dig deeper into
them, especially around the CombinedKey and the flush + cache + rangescan
functionality. I believe Jan had a PR with many of the changes in there,
perhaps I can use some of the work that was done there to help leverage a
similar (or identical) design.

I will certainly be able to make a PoC before going to vote on this one. It
is a larger change and I suspect that we will need to review some of the
finer points to ensure that the design is still suitable and sufficiently
performant. I'll post back when I have something more concrete, but in the
meantime I welcome all other concerns and comments.

Thanks



On Mon, Jun 18, 2018 at 10:05 PM, Matthias J. Sax <ma...@confluent.io>
wrote:

> Adam,
>
> thanks a lot for the KIP. I agree that this would be a valuable feature
> to add. It's a very complex one though. You correctly pointed out, that
> the GlobalKTable (or global stores in general) cannot be the "driver"
> atm and are passively updated only. This is by design. Are you familiar
> with the KIP discussion of KIP-99?
> (https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=67633649
> )
> Would be worth to refresh to understand the tradeoffs and design decisions.
>
> It's unclear to me, what the impact will be if we want to change the
> current design. Even if no GlobalKTable is used, it might have impact on
> performance and for sure on code complexity. Overall, it seems that a
> POC might be required before we can consider adding this (with the
> danger, that it does not get accepted in the end).
>
> Are you aware of KIP-213:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 213+Support+non-key+joining+in+KTable
>
> It suggest to add non-key joins and a lot of issues how to implement
> this were discussed already. As a KTable-GloblKTable join is a non-key
> join, too, it seems that those discussion apply to your KIP too.
>
> Hope this helps to make the next steps.
>
>
> -Matthias
>
>
> On 6/18/18 1:15 PM, Adam Bellemare wrote:
> > Hi All
> >
> > I created KIP-314 and I would like to initiate a discussion on it.
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 314%3A+KTable+to+GlobalKTable+Bi-directional+Join
> >
> > The primary goal of this KIP is to improve the way that Kafka can deal
> with
> > relational data at scale. This KIP would alter the way that GlobalKTables
> > can be used in relation to KTables. I believe that this would be a very
> > useful change but I need some eyes on the technical aspects to validate
> or
> > refute the strategy.
> >
> > Thanks
> >
> > Adam Bellemare
> >
>
>

Re: [DISCUSS] - KIP-314: KTable to GlobalKTable Bi-directional Join

Posted by "Matthias J. Sax" <ma...@confluent.io>.
Adam,

thanks a lot for the KIP. I agree that this would be a valuable feature
to add. It's a very complex one though. You correctly pointed out, that
the GlobalKTable (or global stores in general) cannot be the "driver"
atm and are passively updated only. This is by design. Are you familiar
with the KIP discussion of KIP-99?
(https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=67633649)
Would be worth to refresh to understand the tradeoffs and design decisions.

It's unclear to me, what the impact will be if we want to change the
current design. Even if no GlobalKTable is used, it might have impact on
performance and for sure on code complexity. Overall, it seems that a
POC might be required before we can consider adding this (with the
danger, that it does not get accepted in the end).

Are you aware of KIP-213:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable

It suggest to add non-key joins and a lot of issues how to implement
this were discussed already. As a KTable-GloblKTable join is a non-key
join, too, it seems that those discussion apply to your KIP too.

Hope this helps to make the next steps.


-Matthias


On 6/18/18 1:15 PM, Adam Bellemare wrote:
> Hi All
> 
> I created KIP-314 and I would like to initiate a discussion on it.
> 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-314%3A+KTable+to+GlobalKTable+Bi-directional+Join
> 
> The primary goal of this KIP is to improve the way that Kafka can deal with
> relational data at scale. This KIP would alter the way that GlobalKTables
> can be used in relation to KTables. I believe that this would be a very
> useful change but I need some eyes on the technical aspects to validate or
> refute the strategy.
> 
> Thanks
> 
> Adam Bellemare
>