You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Damian Guy <da...@gmail.com> on 2017/01/03 09:13:54 UTC

Re: [DISCUSS] KIP-99: Add Global Tables to Kafka Streams

Thanks Gouzhang - i'll remove the joins.

I agree we need to refactor TopologyBuilder, but I think we'll need another
KIP for that.

Thanks,
Damian

On Fri, 30 Dec 2016 at 01:32 Guozhang Wang <wa...@gmail.com> wrote:

> 1/2: Sounds good, let's remove the joins within KGlobalTable for now.
>
> 3. I see, makes sense.
>
> Unfortunately since TopologyBuilder is a public class we cannot separate
> its internal usage only functions like build / buildWithGlobalTables / etc
> with other user functions like stream / table / etc. We need to consider
> refactoring this interface sooner than later.
>
> 4/6. OK.
>
>
> Guozhang
>
>
> On Tue, Dec 20, 2016 at 2:16 PM, Damian Guy <da...@gmail.com> wrote:
>
> > Hi Guozhang,
> >
> > Thanks for your input. Answers below, but i'm thinking we should remove
> > joins from GlobalKTables for the time being and re-visit if necessary in
> > the future.
> >
> > 1. with a global table the joins are never really materialized (at least
> > how i see it), rather they are just views on the existing global tables.
> > I've deliberately taken this approach so we don't have to create yet
> > another State Store and changelog topic etc. These all consume resources
> > that i believe are unnecessary. So, i don't really see the point of
> having
> > a materialize method. Further, one of the major benefits of joining two
> > global tables is being able to query them via Interactive Queries. For
> this
> > you need the name, so i think it makes sense to provide it with the join.
> >
> > 2. This has been discussed already in this thread (with Michael), and
> > outerJoin is deliberately not part of the KIP. To be able to join both
> > ways, as you suggest, requires that both inputs are able to map to the
> same
> > key. This is not always going to be possible, i.e., relationships can be
> > one way, so for that reason i felt it was best to not go down that path
> as
> > we'd not be able to resolve it at the time that
> > globalTable.join(otherGlobalTable,...) was called, and this would result
> > in
> > possible confusion. Also, to support this we'd need to physically
> > materialize a StateStore that represents the join (which i think is a
> waste
> > of resources), or, we'd need to provide another interface where we can
> map
> > from the key of the resulting global table to the keys of both of the
> > joined tables.
> >
> > 3. The intention is that the GlobalKTables are in a single topology that
> is
> > owned and updated by a single thread. So yes it is necessary that they
> can
> > be created separately.
> >
> > 4. Bootstrapping and maintaining of the state of GlobalKTables are done
> on
> > a single thread. This thread will run simultaneously with the current
> > StreamThreads. It doesn't make sense to move the bootstrapping of the
> > StandbyTasks to this thread as they are logically part of a StreamThread,
> > they are 'assigned' to the StreamThread. With GlobalKTables there is no
> > assignment as such, the thread just maintains all of them.
> >
> > 5. Yes i'll update the KIP - the state directory will be under the same
> > path as StreamsConfig.STATE_DIR_CONFIG, but it will be a specific
> > directory, i.e, global_state, rather then being a task directory.
> >
> > 6. The whole point of GlobalKTables is to have a copy of ALL of the data
> on
> > each node. I don't think it makes sense to be able to reset the starting
> > position.
> >
> > Thanks,
> > Damian
> >
> > On Tue, 20 Dec 2016 at 20:00 Guozhang Wang <wa...@gmail.com> wrote:
> >
> > > One more thing to add:
> > >
> > > 6. For KGlobalTable, it is always bootstrapped from the beginning while
> > for
> > > other KTables, we are enabling users to override their resetting
> position
> > > as in
> > >
> > > https://github.com/apache/kafka/pull/2007
> > >
> > > Should we consider doing the same for KGlobalTable as well?
> > >
> > >
> > > Guozhang
> > >
> > >
> > > On Tue, Dec 20, 2016 at 11:39 AM, Guozhang Wang <wa...@gmail.com>
> > > wrote:
> > >
> > > > Thanks for the very well written proposal, and sorry for the
> very-late
> > > > review. I have a few comments here:
> > > >
> > > > 1. We are introducing a "queryableViewName" in the GlobalTable join
> > > > results, while I'm wondering if we should just add a more general
> > > function
> > > > like "materialize" to KTable and KGlobalTable with the name to be
> used
> > in
> > > > queries?
> > > >
> > > > 2. For KGlobalTable's own "join" and "leftJoin": since we are only
> > > passing
> > > > the KeyValueMapper<K, V, K1> keyMapper it seems that for either case
> > only
> > > > the left hand side will logically "trigger" the join, which is
> > different
> > > to
> > > > KTable's join semantics. I'm wondering if it would be more consistent
> > to
> > > > have them as:
> > > >
> > > >
> > > > <K1, V1, R> GlobalKTable<K, R> join(final GlobalKTable<K1, V1> other,
> > > >                                         final KeyValueMapper<K, V,
> K1>
> > > > leftkeyMapper,
> > > >                                         final KeyValueMapper<K1, V1,
> K>
> > > > rightkeyMapper,
> > > >                                         final ValueJoiner<V, V1, R>
> > > joiner
> > > >                                         final String
> > queryableViewName);
> > > >
> > > > <K1, V1, R> GlobalKTable<K, R> outerJoin(final GlobalKTable<K1, V1>
> > > other,
> > > >                                          final KeyValueMapper<K, V,
> K1>
> > > > leftkeyMapper,
> > > >                                          final KeyValueMapper<K1, V1,
> > K>
> > > > rightkeyMapper,
> > > >                                          final ValueJoiner<V, V1, R>
> > > > joiner,
> > > >                                          final String
> > queryableViewName);
> > > >
> > > > <K1, V1, R> GlobalKTable<K, R> leftJoin(final GlobalKTable<K1, V1>
> > other,
> > > >                                         final KeyValueMapper<K, V,
> K1>
> > > > keyMapper,
> > > >                                         final ValueJoiner<V, V1, R>
> > > > joiner,
> > > >                                         final String
> > queryableViewName);
> > > >
> > > >
> > > > I.e. add another directional key mapper to join and also to
> outerJoin.
> > > >
> > > >
> > > > 3. For "TopologyBuilder.buildGlobalStateTopology", is it necessary to
> > > > have a separate function from "TopologyBuilder.build" itself? With
> > global
> > > > tables, is there any scenarios that we want to build the topology
> > without
> > > > the embedded global tables (i.e. still calling "build")?
> > > >
> > > > 4. As for implementation, you mentioned that global table
> bootstraping
> > > > will be done in another dedicated thread. Could we also consider
> moving
> > > the
> > > > logic of bootstrapping the standby-replica state stores into this
> > thread
> > > as
> > > > well, which can then leverage on the existing "restoreConsumer" that
> > does
> > > > not participate in the consumer group protocol? By doing this I think
> > we
> > > > can still avoid thread-synchronization while making the logic more
> > clear
> > > > (ideally the standby restoration do not really need to be in part of
> > the
> > > > stream thread's main loops).
> > > >
> > > > 5. Also for the global table's state directory, I'm assuming it will
> > not
> > > > be under the per-task directory as it is per instance. But could you
> > > > elaborate a bit in the wiki about its directory as well? Also could
> we
> > > > consider adding https://issues.apache.org/jira/browse/KAFKA-3522
> along
> > > > with this feature since we may need to change the directory path /
> > > storage
> > > > schema formats for these different types of stores moving forward.
> > > >
> > > >
> > > >
> > > > Guozhang
> > > >
> > > >
> > > > On Fri, Dec 9, 2016 at 4:21 AM, Damian Guy <da...@gmail.com>
> > wrote:
> > > >
> > > >> Thanks for the update Michael.
> > > >>
> > > >> I just wanted to add that there is one crucial piece of information
> > that
> > > >> i've failed to add (I apologise).
> > > >>
> > > >> To me, the join between 2 Global Tables just produces a view on top
> of
> > > the
> > > >> underlying tables (this is the same as it works for KTables today).
> So
> > > >> that
> > > >> means there is no Physical StateStore that backs the join result, it
> > is
> > > >> just a Virtual StateStore that knows how to resolve the join when it
> > is
> > > >> required. I've deliberately taken this path so that we don't end up
> > > having
> > > >> yet another copy of the data, stored on local disk, and sent to
> > another
> > > >> change-log topic. This also reduces the memory overhead from
> creating
> > > >> RocksDBStores and reduces load on the Thread based caches we have.
> So
> > it
> > > >> is
> > > >> a resource optimization.
> > > >>
> > > >> So while it is technically possible to support outer joins, we would
> > > need
> > > >> to physically materialize the StateStore (and create a
> changelog-topic
> > > for
> > > >> it), or, we'd need to provide another interface where the user could
> > map
> > > >> from the outerJoin key to both of the other table keys. This is
> > because
> > > >> the
> > > >> key of the outerJoin table could be either the key of the lhs table,
> > or
> > > >> the
> > > >> rhs tables, or something completely different.
> > > >>
> > > >> With this and what you have mentioned above in mind i think we
> should
> > > park
> > > >> outerJoin support for this KIP and re-visit if and when we need it
> in
> > > the
> > > >> future.
> > > >>
> > > >> I'll update the KIP with this.
> > > >>
> > > >> Thanks,
> > > >> Damian
> > > >>
> > > >> On Fri, 9 Dec 2016 at 09:53 Michael Noll <mi...@confluent.io>
> > wrote:
> > > >>
> > > >> > Damian and I briefly chatted offline (thanks, Damian!), and here's
> > the
> > > >> > summary of my thoughts and conclusion.
> > > >> >
> > > >> > TL;DR: Let's skip outer join support for global tables.
> > > >> >
> > > >> > In more detail:
> > > >> >
> > > >> > - We agreed that, technically, we can add OUTER JOIN support.
> > > However,
> > > >> > outer joins only work if certain preconditions are met.  The
> > > >> preconditions
> > > >> > are IMHO simliar/the same as we have for the normal, partitioned
> > > KTables
> > > >> > (e.g. having matching keys and co-partitioned data for the
> tables),
> > > but
> > > >> in
> > > >> > the case of global tables the user would need to meet all these
> > > >> > preconditions in one big swing when specifying the params for the
> > > outer
> > > >> > join call.  Even so, you'd only know at run-time whether the
> > > >> preconditions
> > > >> > were actually met properly.
> > > >> >
> > > >> > - Hence it's quite likely that users will be confused about these
> > > >> > preconditions and how to meet them, and -- from what we can tell
> --
> > > use
> > > >> > cases / user demand for outer joins have been rare.
> > > >> >
> > > >> > - So, long story short, even though we could add outer join
> support
> > > we'd
> > > >> > suggest to skip it for global tables.  If we subsequently learn
> that
> > > is
> > > >> a
> > > >> > lot of user interest in that functionality, we still have the
> option
> > > to
> > > >> add
> > > >> > it in the future.
> > > >> >
> > > >> >
> > > >> > Best,
> > > >> > Michael
> > > >> >
> > > >> >
> > > >> >
> > > >> >
> > > >> >
> > > >> >
> > > >> > On Thu, Dec 8, 2016 at 6:31 PM, Damian Guy <da...@gmail.com>
> > > >> wrote:
> > > >> >
> > > >> > > Hi Michael,
> > > >> > >
> > > >> > > I don't see how that helps?
> > > >> > >
> > > >> > > Lets say we have tables Person(id, device_id, name, ...),
> > Device(id,
> > > >> > > person_id, type, ...), and both are keyed with same type. And we
> > > have
> > > >> a
> > > >> > > stream, that for the sake of simplicity, has both person_id and
> > > >> > device_id (
> > > >> > > i know this is a bit contrived!)
> > > >> > > so our join
> > > >> > > person = builder.globalTable(...);
> > > >> > > device = builder.globalTable(...);
> > > >> > > personDevice = builder.outerJoin(device, ...);
> > > >> > >
> > > >> > > someStream = builder.stream(..);
> > > >> > > // which id do i use to join with? person.id? device.id?
> > > >> > > someStream.leftJoin(personDevice, ...)
> > > >> > >
> > > >> > > // Interactive Query on the view generated by the join of person
> > and
> > > >> > device
> > > >> > > personDeviceStore = streams.store("personDevice",...);
> > > >> > > // person.id? device.id?
> > > >> > > personDeviceStore.get(someId);
> > > >> > >
> > > >> > > We get records
> > > >> > > person id=1, device_id=2 ,...
> > > >> > > device id=2, person_id=1, ...
> > > >> > > stream person_id = 1, device_id = 2
> > > >> > >
> > > >> > > We could do the join between the GlobalTables both ways as each
> > side
> > > >> > could
> > > >> > > map to the other sides key, but when i'm accessing the resulting
> > > >> table,
> > > >> > > personDevice, what is the key? The person.id ? the device.id?
> it
> > > >> can't
> > > >> > be
> > > >> > > both of them.
> > > >> > >
> > > >> > > Thanks,
> > > >> > > Damian
> > > >> > >
> > > >> > >
> > > >> > >
> > > >> > >
> > > >> > > On Thu, 8 Dec 2016 at 15:51 Michael Noll <mi...@confluent.io>
> > > >> wrote:
> > > >> > >
> > > >> > > > The key type returned by both KeyValueMappers (in the current
> > > trunk
> > > >> > > > version, that type is named `R`) would need to be the same for
> > > this
> > > >> to
> > > >> > > > work.
> > > >> > > >
> > > >> > > >
> > > >> > > > On Wed, Dec 7, 2016 at 4:46 PM, Damian Guy <
> > damian.guy@gmail.com>
> > > >> > wrote:
> > > >> > > >
> > > >> > > > > Michael,
> > > >> > > > >
> > > >> > > > > We can only support outerJoin if both tables are keyed the
> > same
> > > >> way.
> > > >> > > Lets
> > > >> > > > > say for example you can map both ways, however, the key for
> > each
> > > >> > table
> > > >> > > is
> > > >> > > > > of a different type. So t1 is long and t2 is string - what
> is
> > > the
> > > >> key
> > > >> > > > type
> > > >> > > > > of the resulting GlobalKTable? So when you subsequently join
> > to
> > > >> this
> > > >> > > > table,
> > > >> > > > > and do a lookup on it, which key are you using?
> > > >> > > > >
> > > >> > > > > Thanks,
> > > >> > > > > Damian
> > > >> > > > >
> > > >> > > > > On Wed, 7 Dec 2016 at 14:31 Michael Noll <
> > michael@confluent.io>
> > > >> > wrote:
> > > >> > > > >
> > > >> > > > > > Damian,
> > > >> > > > > >
> > > >> > > > > > yes, that makes sense.
> > > >> > > > > >
> > > >> > > > > > But I am still wondering:  In your example, there's no
> prior
> > > >> > > knowledge
> > > >> > > > > "can
> > > >> > > > > > I map from t1->t2" that Streams can leverage for joining
> t1
> > > and
> > > >> t2
> > > >> > > > other
> > > >> > > > > > than blindly relying on the user to provide an appropriate
> > > >> > > > KeyValueMapper
> > > >> > > > > > for K1/V1 of t1 -> K2/V2 of t2.  In other words, if we
> allow
> > > the
> > > >> > user
> > > >> > > > to
> > > >> > > > > > provide a KeyValueMapper from t1->t2 (Streams does not
> know
> > at
> > > >> > > compile
> > > >> > > > > time
> > > >> > > > > > whether this mapping will actually work), then we can also
> > > allow
> > > >> > the
> > > >> > > > user
> > > >> > > > > > to provide a corresponding "reverse" mapper from t2->t1.
> > That
> > > >> is,
> > > >> > we
> > > >> > > > > could
> > > >> > > > > > say that an outer join between two global tables IS
> > supported,
> > > >> but
> > > >> > if
> > > >> > > > and
> > > >> > > > > > only if the user provides two KeyValueMappers, one for
> > t1->t2
> > > >> and
> > > >> > one
> > > >> > > > for
> > > >> > > > > > t2->t1.
> > > >> > > > > >
> > > >> > > > > > The left join t1->t2 (which is supported in the KIP), in
> > > >> general,
> > > >> > > works
> > > >> > > > > > only because of the existence of the user-provided
> > > >> KeyValueMapper
> > > >> > > from
> > > >> > > > > > t1->t2.  The outer join, as you point out, cannot
> satisfied
> > as
> > > >> > easily
> > > >> > > > > > because Streams must know two mappers, t1->t2 plus t2->t1
> --
> > > >> > > otherwise
> > > >> > > > > the
> > > >> > > > > > outer join won't work.
> > > >> > > > > >
> > > >> > > > > >
> > > >> > > > > >
> > > >> > > > > >
> > > >> > > > > >
> > > >> > > > > > On Wed, Dec 7, 2016 at 3:04 PM, Damian Guy <
> > > >> damian.guy@gmail.com>
> > > >> > > > wrote:
> > > >> > > > > >
> > > >> > > > > > > Hi Michael,
> > > >> > > > > > >
> > > >> > > > > > > Sure. Say we have 2 input topics t1 & t2 below:
> > > >> > > > > > > t1{
> > > >> > > > > > >  int key;
> > > >> > > > > > >  string t2_id;
> > > >> > > > > > >  ...
> > > >> > > > > > > }
> > > >> > > > > > >
> > > >> > > > > > > t2 {
> > > >> > > > > > >   string key;
> > > >> > > > > > >   ..
> > > >> > > > > > > }
> > > >> > > > > > > If we create global tables out of these we'd get:
> > > >> > > > > > > GlobalKTable<Integer, ?> t1;
> > > >> > > > > > > GlobalKTable<String, ?> t2;
> > > >> > > > > > >
> > > >> > > > > > > So the join can only go in 1 direction, i.e, from t1 ->
> t2
> > > as
> > > >> in
> > > >> > > > order
> > > >> > > > > to
> > > >> > > > > > > perform the join we need to use a KeyValueMapper to
> > extract
> > > >> the
> > > >> > t2
> > > >> > > > key
> > > >> > > > > > from
> > > >> > > > > > > the t1 value.
> > > >> > > > > > >
> > > >> > > > > > > Does that make sense?
> > > >> > > > > > >
> > > >> > > > > > > Thanks,
> > > >> > > > > > > Damian
> > > >> > > > > > >
> > > >> > > > > > > On Wed, 7 Dec 2016 at 10:44 Michael Noll <
> > > >> michael@confluent.io>
> > > >> > > > wrote:
> > > >> > > > > > >
> > > >> > > > > > > > > There is no outer-join for GlobalKTables as the
> tables
> > > >> may be
> > > >> > > > keyed
> > > >> > > > > > > > > differently. So you need to use the key from the
> left
> > > >> side of
> > > >> > > the
> > > >> > > > > > join
> > > >> > > > > > > > > along with the KeyValueMapper to resolve the right
> > side
> > > of
> > > >> > the
> > > >> > > > > join.
> > > >> > > > > > > This
> > > >> > > > > > > > > wont work the other way around.
> > > >> > > > > > > >
> > > >> > > > > > > > Care to elaborate why it won't work the other way
> > around?
> > > >> If,
> > > >> > > for
> > > >> > > > > > > example,
> > > >> > > > > > > > we swapped the call from leftTable.join(rightTable) to
> > > >> > > > > > > > rightTable.join(leftTable), that join would work, too.
> > > >> > Perhaps I
> > > >> > > > am
> > > >> > > > > > > > missing something though. :-)
> > > >> > > > > > > >
> > > >> > > > > > > >
> > > >> > > > > > > >
> > > >> > > > > > > >
> > > >> > > > > > > > On Wed, Dec 7, 2016 at 10:39 AM, Damian Guy <
> > > >> > > damian.guy@gmail.com>
> > > >> > > > > > > wrote:
> > > >> > > > > > > >
> > > >> > > > > > > > > Hi Matthias,
> > > >> > > > > > > > >
> > > >> > > > > > > > > Thanks for the feedback.
> > > >> > > > > > > > >
> > > >> > > > > > > > > There is no outer-join for GlobalKTables as the
> tables
> > > >> may be
> > > >> > > > keyed
> > > >> > > > > > > > > differently. So you need to use the key from the
> left
> > > >> side of
> > > >> > > the
> > > >> > > > > > join
> > > >> > > > > > > > > along with the KeyValueMapper to resolve the right
> > side
> > > of
> > > >> > the
> > > >> > > > > join.
> > > >> > > > > > > This
> > > >> > > > > > > > > wont work the other way around.
> > > >> > > > > > > > >
> > > >> > > > > > > > > On the bootstrapping concern. If the application is
> > > >> failing
> > > >> > > > before
> > > >> > > > > > > > > bootstrapping finishes, the problem is likely to be
> > > >> related
> > > >> > to
> > > >> > > a
> > > >> > > > > > > terminal
> > > >> > > > > > > > > exception, i.e., running out of disk space, corrupt
> > > state
> > > >> > > stores
> > > >> > > > > etc.
> > > >> > > > > > > In
> > > >> > > > > > > > > these cases, we wouldn't want the application to
> > > continue.
> > > >> > So i
> > > >> > > > > think
> > > >> > > > > > > > this
> > > >> > > > > > > > > is ok.
> > > >> > > > > > > > >
> > > >> > > > > > > > > Thanks,
> > > >> > > > > > > > > Damian
> > > >> > > > > > > > >
> > > >> > > > > > > > > On Tue, 6 Dec 2016 at 21:56 Matthias J. Sax <
> > > >> > > > matthias@confluent.io
> > > >> > > > > >
> > > >> > > > > > > > wrote:
> > > >> > > > > > > > >
> > > >> > > > > > > > > > Thanks for the KIP Damian. Very nice motivating
> > > example!
> > > >> > > > > > > > > >
> > > >> > > > > > > > > > A few comments:
> > > >> > > > > > > > > >
> > > >> > > > > > > > > >  - why is there no outer-join for GlobalKTables
> > > >> > > > > > > > > >  - on bootstrapping GlobalKTable, could it happen
> > that
> > > >> this
> > > >> > > > never
> > > >> > > > > > > > > > finishes if the application fails before
> > bootstrapping
> > > >> > > finishes
> > > >> > > > > and
> > > >> > > > > > > new
> > > >> > > > > > > > > > data gets written at the same time? Do we need to
> > > guard
> > > >> > > against
> > > >> > > > > > this
> > > >> > > > > > > > > > (seems to be a very rare corner case, so maybe not
> > > >> > required)?
> > > >> > > > > > > > > >
> > > >> > > > > > > > > >
> > > >> > > > > > > > > > -Matthias
> > > >> > > > > > > > > >
> > > >> > > > > > > > > >
> > > >> > > > > > > > > > On 12/6/16 2:09 AM, Damian Guy wrote:
> > > >> > > > > > > > > > > Hi all,
> > > >> > > > > > > > > > >
> > > >> > > > > > > > > > > I would like to start the discussion on KIP-99:
> > > >> > > > > > > > > > >
> > > >> > > > > > > > > >
> https://cwiki.apache.org/confluence/pages/viewpage.
> > > >> > > > > > > > > action?pageId=67633649
> > > >> > > > > > > > > > >
> > > >> > > > > > > > > > > Looking forward to your feedback.
> > > >> > > > > > > > > > >
> > > >> > > > > > > > > > > Thanks,
> > > >> > > > > > > > > > > Damian
> > > >> > > > > > > > > > >
> > > >> > > > > > > > > >
> > > >> > > > > > > > > >
> > > >> > > > > > > > >
> > > >> > > > > > > >
> > > >> > > > > > >
> > > >> > > > > >
> > > >> > > > > >
> > > >> > > > > >
> > > >> > > > > > --
> > > >> > > > > > *Michael G. Noll*
> > > >> > > > > > Product Manager | Confluent
> > > >> > > > > > +1 650 453 5860 <(650)%20453-5860> <(650)%20453-5860>
> <(650)%20453-5860>
> > > <(650)%20453-5860>
> > > >> > <(650)%20453-5860> | @miguno <
> > > >> > > > https://twitter.com/miguno
> > > >> > > > > >
> > > >> > > > > > Follow us: Twitter <https://twitter.com/ConfluentInc> |
> > Blog
> > > >> > > > > > <http://www.confluent.io/blog>
> > > >> > > > > >
> > > >> > > > >
> > > >> > > >
> > > >> > >
> > > >> >
> > > >>
> > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
>
> --
> -- Guozhang
>