You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Jan Filipiak (JIRA)" <ji...@apache.org> on 2016/07/01 07:28:11 UTC

[jira] [Commented] (KAFKA-3705) Support non-key joining in KTable

    [ https://issues.apache.org/jira/browse/KAFKA-3705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15358566#comment-15358566 ] 

Jan Filipiak commented on KAFKA-3705:
-------------------------------------

Hi, yes that is kinda where I am coming from. I completely understand where you are. 
Doing the change log case ( logging Change<> objects) is just one implementation of this repartitioning and mine is another one. I am very familiar with my approach as I wrote some Samza apps using this approach. It has many benefits that may or may not be of interest. (repartition-topics can also be used to bootstrap, fewer copies of the data (no need to make state HA, see previous) etc.). What we are still missing here is a mutual understanding of what I think keywidening does and how to expose that to users in a non insane manner.

Maybe I try it with your Json syntax. This is the very example we have and where this tickets feature would allow me to build it in the dsl level of the api.

So lets say I have 3 tables. A, B, C, i want to reach a point where I have C => <C,List<Join<A,B>> this will then be read by our application servers and servers them as a faster way to retrieves this than lets say the original mysql. B has foreign keys in A and C.

 
All tables start of as one topic. keyed by this tables primary key
Topic mysq__jadajadajada_A
A.PK => A
Topic mysq_B
B.PK => B
Topic mysq_C
C.PK => C

I am going to repartition B to A.PK now. In the first example without a widened key.
Then it stays B.PK => B but partitioned by A.PK accordingly.

then I can do the join with A and get
B.PK => joined<B,A>

as of your previous comment:
{quote}
Then a join result of
{a="a1", joined = join("a1-pre", "c1")} 
{quote}
Note the Key stays B.PK (unwindened).
Now I am going to repartition based on C.PK still maintaining
B.PK => joined<B,A>
as the topic layout. 
Now, shit hits the fan. As I am doing my aggregation to become 
C,PK => List<Joined<A,B>>

How would this aggregator looks now?

{code:java}
List<Joined<A,B>> apply(B key, Joined<A,B> value, List<Joined<A,B>> current)
{
   Map m = listToMap(current, bKeyExtractorValueMapper<List<Joined<A,B>,B.PK>);
   if(value == null)
   { 
      m.remove(key)
   }else
   {
     m.put(key,value)
   }
   return m.entrySet.asList

}
{code}

This wouldn't be much different with logged Changes<Joined<A,B>> only the remove and add would be to methods. The problem is, that it doesn't
look wrong. But this code now has race conditions. Think about an update to the A.PK field of a B record that forces it to switch partitions.
(the C.PK value remains) then we publish a delete to the old partition and the new value to the new partition. Then we do the join. then we repartition on the non changed C.PK. This will make out code above see B.PK => null /remove B.PK => Joined<A,B> /add in no particular order. Hence the output is undefined. If we had forcefully by api widened the key to be Joined<A.PK,B.PK> the error would not happen and users would be aware of what happens on repartitioning. I thought this through and it also happens with logging Change<>, as it is really just another implementation.

I hope this finally clarifies that key widening I am talking about. If not, maybe we should have a small skype or something. 
My recommendation is further to not implement this joins as logged Changes<> as it is just more resource intensive and less efficient also making the api more complicated.

PS.: Hive has seen all join types with MapJoins, Skewed Joines, you name it. all these are applicable to streams aswell. Maybe have them in the back of your head.







> Support non-key joining in KTable
> ---------------------------------
>
>                 Key: KAFKA-3705
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3705
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>            Reporter: Guozhang Wang
>            Assignee: Liquan Pei
>              Labels: api
>             Fix For: 0.10.1.0
>
>
> Today in Kafka Streams DSL, KTable joins are only based on keys. If users want to join a KTable A by key {{a}} with another KTable B by key {{b}} but with a "foreign key" {{a}}, and assuming they are read from two topics which are partitioned on {{a}} and {{b}} respectively, they need to do the following pattern:
> {code}
> tableB' = tableB.groupBy(/* select on field "a" */).agg(...); // now tableB' is partitioned on "a"
> tableA.join(tableB', joiner);
> {code}
> Even if these two tables are read from two topics which are already partitioned on {{a}}, users still need to do the pre-aggregation in order to make the two joining streams to be on the same key. This is a draw-back from programability and we should fix it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)