You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Matthias J. Sax" <ma...@confluent.io> on 2019/11/25 00:54:05 UTC

Re: [DISCUSS] - KIP-314: KTable to GlobalKTable Bi-directional Join

Moved this KIP into status "inactive". Feel free to resume and any time.

-Matthias

On 7/1/18 9:47 PM, Guozhang Wang wrote:
> Hello Adam,
> 
> Sorry for being late on this thread. I've read through your updated wiki
> and here are some thoughts:
> 
> * I agree with your assessed impediments. In fact, today although the
> Global KTable have its own checkpoint files, and restoration process,
> during its restoration it will always try to bootstrap the backing global
> store up to the Kafka topic's end of offset, before starting any
> processing. And hence on each machine node the checkpoint offsets  For the
> proposed change that Global KTable also needs to trigger joins, it means
> its restoration process would not fit as well. On the other hand, even for
> Global KTable - Stream joins we do not have such guarantees either: imagine
> if there is a crash, or even graceful shutdown scenario, when the task is
> back online and Global KTable is bootstrapped, it does not guarantee to be
> at the same offset position when it has stopped anyways. So I think one can
> argue that users of Global KTable - KTable join should not expect this
> semantics either.
> 
> * The other issue, which I mentioned above, is that the updates of the
> joins triggered by the Global KTable, and hence executed by the global
> thread, now needs to be propagated into the downstream operators, and more
> important following the order of the join: i.e. if there is a record coming
> from Global KTable, and then later another record coming from the other
> KTable. It means that then the global thread and the stream thread needs to
> be synchronized (note that today these threads are totally in parallel to
> each other).
> 
> 
> With that, I think we can 1) continue working on KIP-213 for local KTable
> joins, and 2) continue this KIP for Global KTable - KTable joins, while
> educating users that similar to Global KTable - KStream joins, the global
> ktable will not Global KTable trigger joins.
> 
> Guozhang
> 
> 
> On Mon, Jun 25, 2018 at 11:05 AM, Adam Bellemare <ad...@gmail.com>
> wrote:
> 
>> Thanks for your help so far guys.
>>
>> While I do think that I have a fairly reasonable way forward for
>> restructuring the topologies and threads, there is, unfortunately, what I
>> believe is a fatal flaw that cannot be easily resolved. I have updated the
>> page (
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>> 314%3A+KTable+to+GlobalKTable+Bi-directional+Join
>> ) with the impediments to the solution, all of which revolve around
>> ensuring that data consistency is maintained. It seems to me that
>> GlobalKTables are not the way forward here and that I may be best
>> redirecting my efforts towards KIP-213 ( https://cwiki.apache.org/
>> confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable ).
>>
>> I would appreciate being proven wrong on my impediment listings, but if
>> there are no other ideas I think we should close this KIP and the
>> associated JIRA. A KTable to GlobalKTable join driven just by the KTable is
>> simply performed by a stream to GKT join with a groupbyKey and reduce to
>> form a state-store, so I would see no need to keep it open otherwise
>> (unless just for the shorthand notation).
>>
>> Thanks again
>>
>> Adam
>>
>> On Fri, Jun 22, 2018 at 9:00 PM, Guozhang Wang <wa...@gmail.com> wrote:
>>
>>> Hello Adam,
>>>
>>> Please see my comments inline.
>>>
>>> On Thu, Jun 21, 2018 at 8:14 AM, Adam Bellemare <
>> adam.bellemare@gmail.com>
>>> wrote:
>>>
>>>> Hi Guozhang
>>>>
>>>> *Re: Questions*
>>>> *1)* I do not yet have a solution to this, but I also did not look that
>>>> closely at it when I begun this KIP. I admit that I was unaware of
>>> exactly
>>>> how the GlobalKTable worked alongside the KTable/KStream topologies.
>> You
>>>> mention "It means the two topologies will be merged, and that merged
>>>> topology can only be executed as a single task, by a single thread. " -
>>> is
>>>> the problem here that the merged topology would be parallelized to
>> other
>>>> threads/instances? While I am becoming familiar with how the topologies
>>> are
>>>> created under the hood, I am not yet fully clear on the implications of
>>>> your statement. I will look into this further.
>>>>
>>>>
>>> Yes. The issue is that today each task is executed by a single thread
>> only
>>> at any given time, and hence any state stores are only accessed by a
>> single
>>> thread (except for interactive queries, and for global tables where the
>>> global update thread write to the global store, and the local thread read
>>> from the global store), if we let the global store update thread to be
>> also
>>> triggering joins and puts send the results into the downstream operators,
>>> then it means that the global store update thread can access on any state
>>> stores in the subsequent part of the topology, breaking our current
>>> threading model.
>>>
>>>
>>>> *2)* " do you mean that although we have a duplicated state store:
>>>> ModifiedEvents in addition to the original Events with only the
>> enhanced
>>>> key, this is not avoidable anyways even if we do re-keying?" Yes, that
>> is
>>>> correct, that is what I meant. I need to improve my knowledge around
>> this
>>>> component too. I have been browsing the KIP-213 discussion thread and
>>>> looking at Jan's code
>>>>
>>>> *Re: Comments*
>>>> *1) *Makes sense. I will update the diagram accordingly. Thanks!
>>>>
>>>> *2)* Wouldn't outer join require that we emit records from the right
>>>> GlobalKTable that have no match in the left KTable? This seems
>> undefined
>>> to
>>>> me with the current proposal (above issues aside), since multiple
>> threads
>>>> would be producing the same output event for a single GlobalKTable
>>> update.
>>>>
>>>>
>>> I was considering mainly about the semantics of table-table joins, that
>>> whether we should add this operator inside our API. Implementation wise,
>> we
>>> will only have one global store update thread per instance, so there will
>>> not be multiple threads producing the same output, but still there would
>> be
>>> other issues that we should consider indeed, as mentioned above. Again
>> this
>>> comment is not about implementations, but API wise if it is desirable to
>>> add it.
>>>
>>>
>>>>
>>>> Questions for you both:
>>>> Q1) Is a KTable always materialized? I am looking at the code under the
>>>> hood, and it seems to me that it's either materialized with an explicit
>>>> Materialized object, or it's given an anonymous name and the default
>>> serdes
>>>> are used. Am I correct in this observation?
>>>>
>>>>
>>> A KTable is not always materialized. For example, a KTable generated from
>>> `KTable#filter` or `KTable#mapValues` does not create a new materialized
>>> state store, but we use the caller `KTable` 's state store for anyone who
>>> wants to query it in joins.
>>>
>>> Moving forward, we are also trying to optimize the topology to only
>>> "logically" materialize a KTable when necessary, this is summarized in
>>> https://issues.apache.org/jira/browse/KAFKA-6761
>>>
>>>
>>>>
>>>> Thanks,
>>>> Adam
>>>>
>>>>
>>>
>>> --
>>> -- Guozhang
>>>
>>
> 
> 
>