You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Damian Guy <da...@gmail.com> on 2016/12/06 10:09:40 UTC

[DISCUSS] KIP-99: Add Global Tables to Kafka Streams

Hi all,

I would like to start the discussion on KIP-99:
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=67633649

Looking forward to your feedback.

Thanks,
Damian

Re: [DISCUSS] KIP-99: Add Global Tables to Kafka Streams

Posted by Damian Guy <da...@gmail.com>.
Thanks Gouzhang - i'll remove the joins.

I agree we need to refactor TopologyBuilder, but I think we'll need another
KIP for that.

Thanks,
Damian

On Fri, 30 Dec 2016 at 01:32 Guozhang Wang <wa...@gmail.com> wrote:

> 1/2: Sounds good, let's remove the joins within KGlobalTable for now.
>
> 3. I see, makes sense.
>
> Unfortunately since TopologyBuilder is a public class we cannot separate
> its internal usage only functions like build / buildWithGlobalTables / etc
> with other user functions like stream / table / etc. We need to consider
> refactoring this interface sooner than later.
>
> 4/6. OK.
>
>
> Guozhang
>
>
> On Tue, Dec 20, 2016 at 2:16 PM, Damian Guy <da...@gmail.com> wrote:
>
> > Hi Guozhang,
> >
> > Thanks for your input. Answers below, but i'm thinking we should remove
> > joins from GlobalKTables for the time being and re-visit if necessary in
> > the future.
> >
> > 1. with a global table the joins are never really materialized (at least
> > how i see it), rather they are just views on the existing global tables.
> > I've deliberately taken this approach so we don't have to create yet
> > another State Store and changelog topic etc. These all consume resources
> > that i believe are unnecessary. So, i don't really see the point of
> having
> > a materialize method. Further, one of the major benefits of joining two
> > global tables is being able to query them via Interactive Queries. For
> this
> > you need the name, so i think it makes sense to provide it with the join.
> >
> > 2. This has been discussed already in this thread (with Michael), and
> > outerJoin is deliberately not part of the KIP. To be able to join both
> > ways, as you suggest, requires that both inputs are able to map to the
> same
> > key. This is not always going to be possible, i.e., relationships can be
> > one way, so for that reason i felt it was best to not go down that path
> as
> > we'd not be able to resolve it at the time that
> > globalTable.join(otherGlobalTable,...) was called, and this would result
> > in
> > possible confusion. Also, to support this we'd need to physically
> > materialize a StateStore that represents the join (which i think is a
> waste
> > of resources), or, we'd need to provide another interface where we can
> map
> > from the key of the resulting global table to the keys of both of the
> > joined tables.
> >
> > 3. The intention is that the GlobalKTables are in a single topology that
> is
> > owned and updated by a single thread. So yes it is necessary that they
> can
> > be created separately.
> >
> > 4. Bootstrapping and maintaining of the state of GlobalKTables are done
> on
> > a single thread. This thread will run simultaneously with the current
> > StreamThreads. It doesn't make sense to move the bootstrapping of the
> > StandbyTasks to this thread as they are logically part of a StreamThread,
> > they are 'assigned' to the StreamThread. With GlobalKTables there is no
> > assignment as such, the thread just maintains all of them.
> >
> > 5. Yes i'll update the KIP - the state directory will be under the same
> > path as StreamsConfig.STATE_DIR_CONFIG, but it will be a specific
> > directory, i.e, global_state, rather then being a task directory.
> >
> > 6. The whole point of GlobalKTables is to have a copy of ALL of the data
> on
> > each node. I don't think it makes sense to be able to reset the starting
> > position.
> >
> > Thanks,
> > Damian
> >
> > On Tue, 20 Dec 2016 at 20:00 Guozhang Wang <wa...@gmail.com> wrote:
> >
> > > One more thing to add:
> > >
> > > 6. For KGlobalTable, it is always bootstrapped from the beginning while
> > for
> > > other KTables, we are enabling users to override their resetting
> position
> > > as in
> > >
> > > https://github.com/apache/kafka/pull/2007
> > >
> > > Should we consider doing the same for KGlobalTable as well?
> > >
> > >
> > > Guozhang
> > >
> > >
> > > On Tue, Dec 20, 2016 at 11:39 AM, Guozhang Wang <wa...@gmail.com>
> > > wrote:
> > >
> > > > Thanks for the very well written proposal, and sorry for the
> very-late
> > > > review. I have a few comments here:
> > > >
> > > > 1. We are introducing a "queryableViewName" in the GlobalTable join
> > > > results, while I'm wondering if we should just add a more general
> > > function
> > > > like "materialize" to KTable and KGlobalTable with the name to be
> used
> > in
> > > > queries?
> > > >
> > > > 2. For KGlobalTable's own "join" and "leftJoin": since we are only
> > > passing
> > > > the KeyValueMapper<K, V, K1> keyMapper it seems that for either case
> > only
> > > > the left hand side will logically "trigger" the join, which is
> > different
> > > to
> > > > KTable's join semantics. I'm wondering if it would be more consistent
> > to
> > > > have them as:
> > > >
> > > >
> > > > <K1, V1, R> GlobalKTable<K, R> join(final GlobalKTable<K1, V1> other,
> > > >                                         final KeyValueMapper<K, V,
> K1>
> > > > leftkeyMapper,
> > > >                                         final KeyValueMapper<K1, V1,
> K>
> > > > rightkeyMapper,
> > > >                                         final ValueJoiner<V, V1, R>
> > > joiner
> > > >                                         final String
> > queryableViewName);
> > > >
> > > > <K1, V1, R> GlobalKTable<K, R> outerJoin(final GlobalKTable<K1, V1>
> > > other,
> > > >                                          final KeyValueMapper<K, V,
> K1>
> > > > leftkeyMapper,
> > > >                                          final KeyValueMapper<K1, V1,
> > K>
> > > > rightkeyMapper,
> > > >                                          final ValueJoiner<V, V1, R>
> > > > joiner,
> > > >                                          final String
> > queryableViewName);
> > > >
> > > > <K1, V1, R> GlobalKTable<K, R> leftJoin(final GlobalKTable<K1, V1>
> > other,
> > > >                                         final KeyValueMapper<K, V,
> K1>
> > > > keyMapper,
> > > >                                         final ValueJoiner<V, V1, R>
> > > > joiner,
> > > >                                         final String
> > queryableViewName);
> > > >
> > > >
> > > > I.e. add another directional key mapper to join and also to
> outerJoin.
> > > >
> > > >
> > > > 3. For "TopologyBuilder.buildGlobalStateTopology", is it necessary to
> > > > have a separate function from "TopologyBuilder.build" itself? With
> > global
> > > > tables, is there any scenarios that we want to build the topology
> > without
> > > > the embedded global tables (i.e. still calling "build")?
> > > >
> > > > 4. As for implementation, you mentioned that global table
> bootstraping
> > > > will be done in another dedicated thread. Could we also consider
> moving
> > > the
> > > > logic of bootstrapping the standby-replica state stores into this
> > thread
> > > as
> > > > well, which can then leverage on the existing "restoreConsumer" that
> > does
> > > > not participate in the consumer group protocol? By doing this I think
> > we
> > > > can still avoid thread-synchronization while making the logic more
> > clear
> > > > (ideally the standby restoration do not really need to be in part of
> > the
> > > > stream thread's main loops).
> > > >
> > > > 5. Also for the global table's state directory, I'm assuming it will
> > not
> > > > be under the per-task directory as it is per instance. But could you
> > > > elaborate a bit in the wiki about its directory as well? Also could
> we
> > > > consider adding https://issues.apache.org/jira/browse/KAFKA-3522
> along
> > > > with this feature since we may need to change the directory path /
> > > storage
> > > > schema formats for these different types of stores moving forward.
> > > >
> > > >
> > > >
> > > > Guozhang
> > > >
> > > >
> > > > On Fri, Dec 9, 2016 at 4:21 AM, Damian Guy <da...@gmail.com>
> > wrote:
> > > >
> > > >> Thanks for the update Michael.
> > > >>
> > > >> I just wanted to add that there is one crucial piece of information
> > that
> > > >> i've failed to add (I apologise).
> > > >>
> > > >> To me, the join between 2 Global Tables just produces a view on top
> of
> > > the
> > > >> underlying tables (this is the same as it works for KTables today).
> So
> > > >> that
> > > >> means there is no Physical StateStore that backs the join result, it
> > is
> > > >> just a Virtual StateStore that knows how to resolve the join when it
> > is
> > > >> required. I've deliberately taken this path so that we don't end up
> > > having
> > > >> yet another copy of the data, stored on local disk, and sent to
> > another
> > > >> change-log topic. This also reduces the memory overhead from
> creating
> > > >> RocksDBStores and reduces load on the Thread based caches we have.
> So
> > it
> > > >> is
> > > >> a resource optimization.
> > > >>
> > > >> So while it is technically possible to support outer joins, we would
> > > need
> > > >> to physically materialize the StateStore (and create a
> changelog-topic
> > > for
> > > >> it), or, we'd need to provide another interface where the user could
> > map
> > > >> from the outerJoin key to both of the other table keys. This is
> > because
> > > >> the
> > > >> key of the outerJoin table could be either the key of the lhs table,
> > or
> > > >> the
> > > >> rhs tables, or something completely different.
> > > >>
> > > >> With this and what you have mentioned above in mind i think we
> should
> > > park
> > > >> outerJoin support for this KIP and re-visit if and when we need it
> in
> > > the
> > > >> future.
> > > >>
> > > >> I'll update the KIP with this.
> > > >>
> > > >> Thanks,
> > > >> Damian
> > > >>
> > > >> On Fri, 9 Dec 2016 at 09:53 Michael Noll <mi...@confluent.io>
> > wrote:
> > > >>
> > > >> > Damian and I briefly chatted offline (thanks, Damian!), and here's
> > the
> > > >> > summary of my thoughts and conclusion.
> > > >> >
> > > >> > TL;DR: Let's skip outer join support for global tables.
> > > >> >
> > > >> > In more detail:
> > > >> >
> > > >> > - We agreed that, technically, we can add OUTER JOIN support.
> > > However,
> > > >> > outer joins only work if certain preconditions are met.  The
> > > >> preconditions
> > > >> > are IMHO simliar/the same as we have for the normal, partitioned
> > > KTables
> > > >> > (e.g. having matching keys and co-partitioned data for the
> tables),
> > > but
> > > >> in
> > > >> > the case of global tables the user would need to meet all these
> > > >> > preconditions in one big swing when specifying the params for the
> > > outer
> > > >> > join call.  Even so, you'd only know at run-time whether the
> > > >> preconditions
> > > >> > were actually met properly.
> > > >> >
> > > >> > - Hence it's quite likely that users will be confused about these
> > > >> > preconditions and how to meet them, and -- from what we can tell
> --
> > > use
> > > >> > cases / user demand for outer joins have been rare.
> > > >> >
> > > >> > - So, long story short, even though we could add outer join
> support
> > > we'd
> > > >> > suggest to skip it for global tables.  If we subsequently learn
> that
> > > is
> > > >> a
> > > >> > lot of user interest in that functionality, we still have the
> option
> > > to
> > > >> add
> > > >> > it in the future.
> > > >> >
> > > >> >
> > > >> > Best,
> > > >> > Michael
> > > >> >
> > > >> >
> > > >> >
> > > >> >
> > > >> >
> > > >> >
> > > >> > On Thu, Dec 8, 2016 at 6:31 PM, Damian Guy <da...@gmail.com>
> > > >> wrote:
> > > >> >
> > > >> > > Hi Michael,
> > > >> > >
> > > >> > > I don't see how that helps?
> > > >> > >
> > > >> > > Lets say we have tables Person(id, device_id, name, ...),
> > Device(id,
> > > >> > > person_id, type, ...), and both are keyed with same type. And we
> > > have
> > > >> a
> > > >> > > stream, that for the sake of simplicity, has both person_id and
> > > >> > device_id (
> > > >> > > i know this is a bit contrived!)
> > > >> > > so our join
> > > >> > > person = builder.globalTable(...);
> > > >> > > device = builder.globalTable(...);
> > > >> > > personDevice = builder.outerJoin(device, ...);
> > > >> > >
> > > >> > > someStream = builder.stream(..);
> > > >> > > // which id do i use to join with? person.id? device.id?
> > > >> > > someStream.leftJoin(personDevice, ...)
> > > >> > >
> > > >> > > // Interactive Query on the view generated by the join of person
> > and
> > > >> > device
> > > >> > > personDeviceStore = streams.store("personDevice",...);
> > > >> > > // person.id? device.id?
> > > >> > > personDeviceStore.get(someId);
> > > >> > >
> > > >> > > We get records
> > > >> > > person id=1, device_id=2 ,...
> > > >> > > device id=2, person_id=1, ...
> > > >> > > stream person_id = 1, device_id = 2
> > > >> > >
> > > >> > > We could do the join between the GlobalTables both ways as each
> > side
> > > >> > could
> > > >> > > map to the other sides key, but when i'm accessing the resulting
> > > >> table,
> > > >> > > personDevice, what is the key? The person.id ? the device.id?
> it
> > > >> can't
> > > >> > be
> > > >> > > both of them.
> > > >> > >
> > > >> > > Thanks,
> > > >> > > Damian
> > > >> > >
> > > >> > >
> > > >> > >
> > > >> > >
> > > >> > > On Thu, 8 Dec 2016 at 15:51 Michael Noll <mi...@confluent.io>
> > > >> wrote:
> > > >> > >
> > > >> > > > The key type returned by both KeyValueMappers (in the current
> > > trunk
> > > >> > > > version, that type is named `R`) would need to be the same for
> > > this
> > > >> to
> > > >> > > > work.
> > > >> > > >
> > > >> > > >
> > > >> > > > On Wed, Dec 7, 2016 at 4:46 PM, Damian Guy <
> > damian.guy@gmail.com>
> > > >> > wrote:
> > > >> > > >
> > > >> > > > > Michael,
> > > >> > > > >
> > > >> > > > > We can only support outerJoin if both tables are keyed the
> > same
> > > >> way.
> > > >> > > Lets
> > > >> > > > > say for example you can map both ways, however, the key for
> > each
> > > >> > table
> > > >> > > is
> > > >> > > > > of a different type. So t1 is long and t2 is string - what
> is
> > > the
> > > >> key
> > > >> > > > type
> > > >> > > > > of the resulting GlobalKTable? So when you subsequently join
> > to
> > > >> this
> > > >> > > > table,
> > > >> > > > > and do a lookup on it, which key are you using?
> > > >> > > > >
> > > >> > > > > Thanks,
> > > >> > > > > Damian
> > > >> > > > >
> > > >> > > > > On Wed, 7 Dec 2016 at 14:31 Michael Noll <
> > michael@confluent.io>
> > > >> > wrote:
> > > >> > > > >
> > > >> > > > > > Damian,
> > > >> > > > > >
> > > >> > > > > > yes, that makes sense.
> > > >> > > > > >
> > > >> > > > > > But I am still wondering:  In your example, there's no
> prior
> > > >> > > knowledge
> > > >> > > > > "can
> > > >> > > > > > I map from t1->t2" that Streams can leverage for joining
> t1
> > > and
> > > >> t2
> > > >> > > > other
> > > >> > > > > > than blindly relying on the user to provide an appropriate
> > > >> > > > KeyValueMapper
> > > >> > > > > > for K1/V1 of t1 -> K2/V2 of t2.  In other words, if we
> allow
> > > the
> > > >> > user
> > > >> > > > to
> > > >> > > > > > provide a KeyValueMapper from t1->t2 (Streams does not
> know
> > at
> > > >> > > compile
> > > >> > > > > time
> > > >> > > > > > whether this mapping will actually work), then we can also
> > > allow
> > > >> > the
> > > >> > > > user
> > > >> > > > > > to provide a corresponding "reverse" mapper from t2->t1.
> > That
> > > >> is,
> > > >> > we
> > > >> > > > > could
> > > >> > > > > > say that an outer join between two global tables IS
> > supported,
> > > >> but
> > > >> > if
> > > >> > > > and
> > > >> > > > > > only if the user provides two KeyValueMappers, one for
> > t1->t2
> > > >> and
> > > >> > one
> > > >> > > > for
> > > >> > > > > > t2->t1.
> > > >> > > > > >
> > > >> > > > > > The left join t1->t2 (which is supported in the KIP), in
> > > >> general,
> > > >> > > works
> > > >> > > > > > only because of the existence of the user-provided
> > > >> KeyValueMapper
> > > >> > > from
> > > >> > > > > > t1->t2.  The outer join, as you point out, cannot
> satisfied
> > as
> > > >> > easily
> > > >> > > > > > because Streams must know two mappers, t1->t2 plus t2->t1
> --
> > > >> > > otherwise
> > > >> > > > > the
> > > >> > > > > > outer join won't work.
> > > >> > > > > >
> > > >> > > > > >
> > > >> > > > > >
> > > >> > > > > >
> > > >> > > > > >
> > > >> > > > > > On Wed, Dec 7, 2016 at 3:04 PM, Damian Guy <
> > > >> damian.guy@gmail.com>
> > > >> > > > wrote:
> > > >> > > > > >
> > > >> > > > > > > Hi Michael,
> > > >> > > > > > >
> > > >> > > > > > > Sure. Say we have 2 input topics t1 & t2 below:
> > > >> > > > > > > t1{
> > > >> > > > > > >  int key;
> > > >> > > > > > >  string t2_id;
> > > >> > > > > > >  ...
> > > >> > > > > > > }
> > > >> > > > > > >
> > > >> > > > > > > t2 {
> > > >> > > > > > >   string key;
> > > >> > > > > > >   ..
> > > >> > > > > > > }
> > > >> > > > > > > If we create global tables out of these we'd get:
> > > >> > > > > > > GlobalKTable<Integer, ?> t1;
> > > >> > > > > > > GlobalKTable<String, ?> t2;
> > > >> > > > > > >
> > > >> > > > > > > So the join can only go in 1 direction, i.e, from t1 ->
> t2
> > > as
> > > >> in
> > > >> > > > order
> > > >> > > > > to
> > > >> > > > > > > perform the join we need to use a KeyValueMapper to
> > extract
> > > >> the
> > > >> > t2
> > > >> > > > key
> > > >> > > > > > from
> > > >> > > > > > > the t1 value.
> > > >> > > > > > >
> > > >> > > > > > > Does that make sense?
> > > >> > > > > > >
> > > >> > > > > > > Thanks,
> > > >> > > > > > > Damian
> > > >> > > > > > >
> > > >> > > > > > > On Wed, 7 Dec 2016 at 10:44 Michael Noll <
> > > >> michael@confluent.io>
> > > >> > > > wrote:
> > > >> > > > > > >
> > > >> > > > > > > > > There is no outer-join for GlobalKTables as the
> tables
> > > >> may be
> > > >> > > > keyed
> > > >> > > > > > > > > differently. So you need to use the key from the
> left
> > > >> side of
> > > >> > > the
> > > >> > > > > > join
> > > >> > > > > > > > > along with the KeyValueMapper to resolve the right
> > side
> > > of
> > > >> > the
> > > >> > > > > join.
> > > >> > > > > > > This
> > > >> > > > > > > > > wont work the other way around.
> > > >> > > > > > > >
> > > >> > > > > > > > Care to elaborate why it won't work the other way
> > around?
> > > >> If,
> > > >> > > for
> > > >> > > > > > > example,
> > > >> > > > > > > > we swapped the call from leftTable.join(rightTable) to
> > > >> > > > > > > > rightTable.join(leftTable), that join would work, too.
> > > >> > Perhaps I
> > > >> > > > am
> > > >> > > > > > > > missing something though. :-)
> > > >> > > > > > > >
> > > >> > > > > > > >
> > > >> > > > > > > >
> > > >> > > > > > > >
> > > >> > > > > > > > On Wed, Dec 7, 2016 at 10:39 AM, Damian Guy <
> > > >> > > damian.guy@gmail.com>
> > > >> > > > > > > wrote:
> > > >> > > > > > > >
> > > >> > > > > > > > > Hi Matthias,
> > > >> > > > > > > > >
> > > >> > > > > > > > > Thanks for the feedback.
> > > >> > > > > > > > >
> > > >> > > > > > > > > There is no outer-join for GlobalKTables as the
> tables
> > > >> may be
> > > >> > > > keyed
> > > >> > > > > > > > > differently. So you need to use the key from the
> left
> > > >> side of
> > > >> > > the
> > > >> > > > > > join
> > > >> > > > > > > > > along with the KeyValueMapper to resolve the right
> > side
> > > of
> > > >> > the
> > > >> > > > > join.
> > > >> > > > > > > This
> > > >> > > > > > > > > wont work the other way around.
> > > >> > > > > > > > >
> > > >> > > > > > > > > On the bootstrapping concern. If the application is
> > > >> failing
> > > >> > > > before
> > > >> > > > > > > > > bootstrapping finishes, the problem is likely to be
> > > >> related
> > > >> > to
> > > >> > > a
> > > >> > > > > > > terminal
> > > >> > > > > > > > > exception, i.e., running out of disk space, corrupt
> > > state
> > > >> > > stores
> > > >> > > > > etc.
> > > >> > > > > > > In
> > > >> > > > > > > > > these cases, we wouldn't want the application to
> > > continue.
> > > >> > So i
> > > >> > > > > think
> > > >> > > > > > > > this
> > > >> > > > > > > > > is ok.
> > > >> > > > > > > > >
> > > >> > > > > > > > > Thanks,
> > > >> > > > > > > > > Damian
> > > >> > > > > > > > >
> > > >> > > > > > > > > On Tue, 6 Dec 2016 at 21:56 Matthias J. Sax <
> > > >> > > > matthias@confluent.io
> > > >> > > > > >
> > > >> > > > > > > > wrote:
> > > >> > > > > > > > >
> > > >> > > > > > > > > > Thanks for the KIP Damian. Very nice motivating
> > > example!
> > > >> > > > > > > > > >
> > > >> > > > > > > > > > A few comments:
> > > >> > > > > > > > > >
> > > >> > > > > > > > > >  - why is there no outer-join for GlobalKTables
> > > >> > > > > > > > > >  - on bootstrapping GlobalKTable, could it happen
> > that
> > > >> this
> > > >> > > > never
> > > >> > > > > > > > > > finishes if the application fails before
> > bootstrapping
> > > >> > > finishes
> > > >> > > > > and
> > > >> > > > > > > new
> > > >> > > > > > > > > > data gets written at the same time? Do we need to
> > > guard
> > > >> > > against
> > > >> > > > > > this
> > > >> > > > > > > > > > (seems to be a very rare corner case, so maybe not
> > > >> > required)?
> > > >> > > > > > > > > >
> > > >> > > > > > > > > >
> > > >> > > > > > > > > > -Matthias
> > > >> > > > > > > > > >
> > > >> > > > > > > > > >
> > > >> > > > > > > > > > On 12/6/16 2:09 AM, Damian Guy wrote:
> > > >> > > > > > > > > > > Hi all,
> > > >> > > > > > > > > > >
> > > >> > > > > > > > > > > I would like to start the discussion on KIP-99:
> > > >> > > > > > > > > > >
> > > >> > > > > > > > > >
> https://cwiki.apache.org/confluence/pages/viewpage.
> > > >> > > > > > > > > action?pageId=67633649
> > > >> > > > > > > > > > >
> > > >> > > > > > > > > > > Looking forward to your feedback.
> > > >> > > > > > > > > > >
> > > >> > > > > > > > > > > Thanks,
> > > >> > > > > > > > > > > Damian
> > > >> > > > > > > > > > >
> > > >> > > > > > > > > >
> > > >> > > > > > > > > >
> > > >> > > > > > > > >
> > > >> > > > > > > >
> > > >> > > > > > >
> > > >> > > > > >
> > > >> > > > > >
> > > >> > > > > >
> > > >> > > > > > --
> > > >> > > > > > *Michael G. Noll*
> > > >> > > > > > Product Manager | Confluent
> > > >> > > > > > +1 650 453 5860 <(650)%20453-5860> <(650)%20453-5860>
> <(650)%20453-5860>
> > > <(650)%20453-5860>
> > > >> > <(650)%20453-5860> | @miguno <
> > > >> > > > https://twitter.com/miguno
> > > >> > > > > >
> > > >> > > > > > Follow us: Twitter <https://twitter.com/ConfluentInc> |
> > Blog
> > > >> > > > > > <http://www.confluent.io/blog>
> > > >> > > > > >
> > > >> > > > >
> > > >> > > >
> > > >> > >
> > > >> >
> > > >>
> > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
>
> --
> -- Guozhang
>

Re: [DISCUSS] KIP-99: Add Global Tables to Kafka Streams

Posted by Guozhang Wang <wa...@gmail.com>.
1/2: Sounds good, let's remove the joins within KGlobalTable for now.

3. I see, makes sense.

Unfortunately since TopologyBuilder is a public class we cannot separate
its internal usage only functions like build / buildWithGlobalTables / etc
with other user functions like stream / table / etc. We need to consider
refactoring this interface sooner than later.

4/6. OK.


Guozhang


On Tue, Dec 20, 2016 at 2:16 PM, Damian Guy <da...@gmail.com> wrote:

> Hi Guozhang,
>
> Thanks for your input. Answers below, but i'm thinking we should remove
> joins from GlobalKTables for the time being and re-visit if necessary in
> the future.
>
> 1. with a global table the joins are never really materialized (at least
> how i see it), rather they are just views on the existing global tables.
> I've deliberately taken this approach so we don't have to create yet
> another State Store and changelog topic etc. These all consume resources
> that i believe are unnecessary. So, i don't really see the point of having
> a materialize method. Further, one of the major benefits of joining two
> global tables is being able to query them via Interactive Queries. For this
> you need the name, so i think it makes sense to provide it with the join.
>
> 2. This has been discussed already in this thread (with Michael), and
> outerJoin is deliberately not part of the KIP. To be able to join both
> ways, as you suggest, requires that both inputs are able to map to the same
> key. This is not always going to be possible, i.e., relationships can be
> one way, so for that reason i felt it was best to not go down that path as
> we'd not be able to resolve it at the time that
> globalTable.join(otherGlobalTable,...) was called, and this would result
> in
> possible confusion. Also, to support this we'd need to physically
> materialize a StateStore that represents the join (which i think is a waste
> of resources), or, we'd need to provide another interface where we can map
> from the key of the resulting global table to the keys of both of the
> joined tables.
>
> 3. The intention is that the GlobalKTables are in a single topology that is
> owned and updated by a single thread. So yes it is necessary that they can
> be created separately.
>
> 4. Bootstrapping and maintaining of the state of GlobalKTables are done on
> a single thread. This thread will run simultaneously with the current
> StreamThreads. It doesn't make sense to move the bootstrapping of the
> StandbyTasks to this thread as they are logically part of a StreamThread,
> they are 'assigned' to the StreamThread. With GlobalKTables there is no
> assignment as such, the thread just maintains all of them.
>
> 5. Yes i'll update the KIP - the state directory will be under the same
> path as StreamsConfig.STATE_DIR_CONFIG, but it will be a specific
> directory, i.e, global_state, rather then being a task directory.
>
> 6. The whole point of GlobalKTables is to have a copy of ALL of the data on
> each node. I don't think it makes sense to be able to reset the starting
> position.
>
> Thanks,
> Damian
>
> On Tue, 20 Dec 2016 at 20:00 Guozhang Wang <wa...@gmail.com> wrote:
>
> > One more thing to add:
> >
> > 6. For KGlobalTable, it is always bootstrapped from the beginning while
> for
> > other KTables, we are enabling users to override their resetting position
> > as in
> >
> > https://github.com/apache/kafka/pull/2007
> >
> > Should we consider doing the same for KGlobalTable as well?
> >
> >
> > Guozhang
> >
> >
> > On Tue, Dec 20, 2016 at 11:39 AM, Guozhang Wang <wa...@gmail.com>
> > wrote:
> >
> > > Thanks for the very well written proposal, and sorry for the very-late
> > > review. I have a few comments here:
> > >
> > > 1. We are introducing a "queryableViewName" in the GlobalTable join
> > > results, while I'm wondering if we should just add a more general
> > function
> > > like "materialize" to KTable and KGlobalTable with the name to be used
> in
> > > queries?
> > >
> > > 2. For KGlobalTable's own "join" and "leftJoin": since we are only
> > passing
> > > the KeyValueMapper<K, V, K1> keyMapper it seems that for either case
> only
> > > the left hand side will logically "trigger" the join, which is
> different
> > to
> > > KTable's join semantics. I'm wondering if it would be more consistent
> to
> > > have them as:
> > >
> > >
> > > <K1, V1, R> GlobalKTable<K, R> join(final GlobalKTable<K1, V1> other,
> > >                                         final KeyValueMapper<K, V, K1>
> > > leftkeyMapper,
> > >                                         final KeyValueMapper<K1, V1, K>
> > > rightkeyMapper,
> > >                                         final ValueJoiner<V, V1, R>
> > joiner
> > >                                         final String
> queryableViewName);
> > >
> > > <K1, V1, R> GlobalKTable<K, R> outerJoin(final GlobalKTable<K1, V1>
> > other,
> > >                                          final KeyValueMapper<K, V, K1>
> > > leftkeyMapper,
> > >                                          final KeyValueMapper<K1, V1,
> K>
> > > rightkeyMapper,
> > >                                          final ValueJoiner<V, V1, R>
> > > joiner,
> > >                                          final String
> queryableViewName);
> > >
> > > <K1, V1, R> GlobalKTable<K, R> leftJoin(final GlobalKTable<K1, V1>
> other,
> > >                                         final KeyValueMapper<K, V, K1>
> > > keyMapper,
> > >                                         final ValueJoiner<V, V1, R>
> > > joiner,
> > >                                         final String
> queryableViewName);
> > >
> > >
> > > I.e. add another directional key mapper to join and also to outerJoin.
> > >
> > >
> > > 3. For "TopologyBuilder.buildGlobalStateTopology", is it necessary to
> > > have a separate function from "TopologyBuilder.build" itself? With
> global
> > > tables, is there any scenarios that we want to build the topology
> without
> > > the embedded global tables (i.e. still calling "build")?
> > >
> > > 4. As for implementation, you mentioned that global table bootstraping
> > > will be done in another dedicated thread. Could we also consider moving
> > the
> > > logic of bootstrapping the standby-replica state stores into this
> thread
> > as
> > > well, which can then leverage on the existing "restoreConsumer" that
> does
> > > not participate in the consumer group protocol? By doing this I think
> we
> > > can still avoid thread-synchronization while making the logic more
> clear
> > > (ideally the standby restoration do not really need to be in part of
> the
> > > stream thread's main loops).
> > >
> > > 5. Also for the global table's state directory, I'm assuming it will
> not
> > > be under the per-task directory as it is per instance. But could you
> > > elaborate a bit in the wiki about its directory as well? Also could we
> > > consider adding https://issues.apache.org/jira/browse/KAFKA-3522 along
> > > with this feature since we may need to change the directory path /
> > storage
> > > schema formats for these different types of stores moving forward.
> > >
> > >
> > >
> > > Guozhang
> > >
> > >
> > > On Fri, Dec 9, 2016 at 4:21 AM, Damian Guy <da...@gmail.com>
> wrote:
> > >
> > >> Thanks for the update Michael.
> > >>
> > >> I just wanted to add that there is one crucial piece of information
> that
> > >> i've failed to add (I apologise).
> > >>
> > >> To me, the join between 2 Global Tables just produces a view on top of
> > the
> > >> underlying tables (this is the same as it works for KTables today). So
> > >> that
> > >> means there is no Physical StateStore that backs the join result, it
> is
> > >> just a Virtual StateStore that knows how to resolve the join when it
> is
> > >> required. I've deliberately taken this path so that we don't end up
> > having
> > >> yet another copy of the data, stored on local disk, and sent to
> another
> > >> change-log topic. This also reduces the memory overhead from creating
> > >> RocksDBStores and reduces load on the Thread based caches we have. So
> it
> > >> is
> > >> a resource optimization.
> > >>
> > >> So while it is technically possible to support outer joins, we would
> > need
> > >> to physically materialize the StateStore (and create a changelog-topic
> > for
> > >> it), or, we'd need to provide another interface where the user could
> map
> > >> from the outerJoin key to both of the other table keys. This is
> because
> > >> the
> > >> key of the outerJoin table could be either the key of the lhs table,
> or
> > >> the
> > >> rhs tables, or something completely different.
> > >>
> > >> With this and what you have mentioned above in mind i think we should
> > park
> > >> outerJoin support for this KIP and re-visit if and when we need it in
> > the
> > >> future.
> > >>
> > >> I'll update the KIP with this.
> > >>
> > >> Thanks,
> > >> Damian
> > >>
> > >> On Fri, 9 Dec 2016 at 09:53 Michael Noll <mi...@confluent.io>
> wrote:
> > >>
> > >> > Damian and I briefly chatted offline (thanks, Damian!), and here's
> the
> > >> > summary of my thoughts and conclusion.
> > >> >
> > >> > TL;DR: Let's skip outer join support for global tables.
> > >> >
> > >> > In more detail:
> > >> >
> > >> > - We agreed that, technically, we can add OUTER JOIN support.
> > However,
> > >> > outer joins only work if certain preconditions are met.  The
> > >> preconditions
> > >> > are IMHO simliar/the same as we have for the normal, partitioned
> > KTables
> > >> > (e.g. having matching keys and co-partitioned data for the tables),
> > but
> > >> in
> > >> > the case of global tables the user would need to meet all these
> > >> > preconditions in one big swing when specifying the params for the
> > outer
> > >> > join call.  Even so, you'd only know at run-time whether the
> > >> preconditions
> > >> > were actually met properly.
> > >> >
> > >> > - Hence it's quite likely that users will be confused about these
> > >> > preconditions and how to meet them, and -- from what we can tell --
> > use
> > >> > cases / user demand for outer joins have been rare.
> > >> >
> > >> > - So, long story short, even though we could add outer join support
> > we'd
> > >> > suggest to skip it for global tables.  If we subsequently learn that
> > is
> > >> a
> > >> > lot of user interest in that functionality, we still have the option
> > to
> > >> add
> > >> > it in the future.
> > >> >
> > >> >
> > >> > Best,
> > >> > Michael
> > >> >
> > >> >
> > >> >
> > >> >
> > >> >
> > >> >
> > >> > On Thu, Dec 8, 2016 at 6:31 PM, Damian Guy <da...@gmail.com>
> > >> wrote:
> > >> >
> > >> > > Hi Michael,
> > >> > >
> > >> > > I don't see how that helps?
> > >> > >
> > >> > > Lets say we have tables Person(id, device_id, name, ...),
> Device(id,
> > >> > > person_id, type, ...), and both are keyed with same type. And we
> > have
> > >> a
> > >> > > stream, that for the sake of simplicity, has both person_id and
> > >> > device_id (
> > >> > > i know this is a bit contrived!)
> > >> > > so our join
> > >> > > person = builder.globalTable(...);
> > >> > > device = builder.globalTable(...);
> > >> > > personDevice = builder.outerJoin(device, ...);
> > >> > >
> > >> > > someStream = builder.stream(..);
> > >> > > // which id do i use to join with? person.id? device.id?
> > >> > > someStream.leftJoin(personDevice, ...)
> > >> > >
> > >> > > // Interactive Query on the view generated by the join of person
> and
> > >> > device
> > >> > > personDeviceStore = streams.store("personDevice",...);
> > >> > > // person.id? device.id?
> > >> > > personDeviceStore.get(someId);
> > >> > >
> > >> > > We get records
> > >> > > person id=1, device_id=2 ,...
> > >> > > device id=2, person_id=1, ...
> > >> > > stream person_id = 1, device_id = 2
> > >> > >
> > >> > > We could do the join between the GlobalTables both ways as each
> side
> > >> > could
> > >> > > map to the other sides key, but when i'm accessing the resulting
> > >> table,
> > >> > > personDevice, what is the key? The person.id ? the device.id? it
> > >> can't
> > >> > be
> > >> > > both of them.
> > >> > >
> > >> > > Thanks,
> > >> > > Damian
> > >> > >
> > >> > >
> > >> > >
> > >> > >
> > >> > > On Thu, 8 Dec 2016 at 15:51 Michael Noll <mi...@confluent.io>
> > >> wrote:
> > >> > >
> > >> > > > The key type returned by both KeyValueMappers (in the current
> > trunk
> > >> > > > version, that type is named `R`) would need to be the same for
> > this
> > >> to
> > >> > > > work.
> > >> > > >
> > >> > > >
> > >> > > > On Wed, Dec 7, 2016 at 4:46 PM, Damian Guy <
> damian.guy@gmail.com>
> > >> > wrote:
> > >> > > >
> > >> > > > > Michael,
> > >> > > > >
> > >> > > > > We can only support outerJoin if both tables are keyed the
> same
> > >> way.
> > >> > > Lets
> > >> > > > > say for example you can map both ways, however, the key for
> each
> > >> > table
> > >> > > is
> > >> > > > > of a different type. So t1 is long and t2 is string - what is
> > the
> > >> key
> > >> > > > type
> > >> > > > > of the resulting GlobalKTable? So when you subsequently join
> to
> > >> this
> > >> > > > table,
> > >> > > > > and do a lookup on it, which key are you using?
> > >> > > > >
> > >> > > > > Thanks,
> > >> > > > > Damian
> > >> > > > >
> > >> > > > > On Wed, 7 Dec 2016 at 14:31 Michael Noll <
> michael@confluent.io>
> > >> > wrote:
> > >> > > > >
> > >> > > > > > Damian,
> > >> > > > > >
> > >> > > > > > yes, that makes sense.
> > >> > > > > >
> > >> > > > > > But I am still wondering:  In your example, there's no prior
> > >> > > knowledge
> > >> > > > > "can
> > >> > > > > > I map from t1->t2" that Streams can leverage for joining t1
> > and
> > >> t2
> > >> > > > other
> > >> > > > > > than blindly relying on the user to provide an appropriate
> > >> > > > KeyValueMapper
> > >> > > > > > for K1/V1 of t1 -> K2/V2 of t2.  In other words, if we allow
> > the
> > >> > user
> > >> > > > to
> > >> > > > > > provide a KeyValueMapper from t1->t2 (Streams does not know
> at
> > >> > > compile
> > >> > > > > time
> > >> > > > > > whether this mapping will actually work), then we can also
> > allow
> > >> > the
> > >> > > > user
> > >> > > > > > to provide a corresponding "reverse" mapper from t2->t1.
> That
> > >> is,
> > >> > we
> > >> > > > > could
> > >> > > > > > say that an outer join between two global tables IS
> supported,
> > >> but
> > >> > if
> > >> > > > and
> > >> > > > > > only if the user provides two KeyValueMappers, one for
> t1->t2
> > >> and
> > >> > one
> > >> > > > for
> > >> > > > > > t2->t1.
> > >> > > > > >
> > >> > > > > > The left join t1->t2 (which is supported in the KIP), in
> > >> general,
> > >> > > works
> > >> > > > > > only because of the existence of the user-provided
> > >> KeyValueMapper
> > >> > > from
> > >> > > > > > t1->t2.  The outer join, as you point out, cannot satisfied
> as
> > >> > easily
> > >> > > > > > because Streams must know two mappers, t1->t2 plus t2->t1 --
> > >> > > otherwise
> > >> > > > > the
> > >> > > > > > outer join won't work.
> > >> > > > > >
> > >> > > > > >
> > >> > > > > >
> > >> > > > > >
> > >> > > > > >
> > >> > > > > > On Wed, Dec 7, 2016 at 3:04 PM, Damian Guy <
> > >> damian.guy@gmail.com>
> > >> > > > wrote:
> > >> > > > > >
> > >> > > > > > > Hi Michael,
> > >> > > > > > >
> > >> > > > > > > Sure. Say we have 2 input topics t1 & t2 below:
> > >> > > > > > > t1{
> > >> > > > > > >  int key;
> > >> > > > > > >  string t2_id;
> > >> > > > > > >  ...
> > >> > > > > > > }
> > >> > > > > > >
> > >> > > > > > > t2 {
> > >> > > > > > >   string key;
> > >> > > > > > >   ..
> > >> > > > > > > }
> > >> > > > > > > If we create global tables out of these we'd get:
> > >> > > > > > > GlobalKTable<Integer, ?> t1;
> > >> > > > > > > GlobalKTable<String, ?> t2;
> > >> > > > > > >
> > >> > > > > > > So the join can only go in 1 direction, i.e, from t1 -> t2
> > as
> > >> in
> > >> > > > order
> > >> > > > > to
> > >> > > > > > > perform the join we need to use a KeyValueMapper to
> extract
> > >> the
> > >> > t2
> > >> > > > key
> > >> > > > > > from
> > >> > > > > > > the t1 value.
> > >> > > > > > >
> > >> > > > > > > Does that make sense?
> > >> > > > > > >
> > >> > > > > > > Thanks,
> > >> > > > > > > Damian
> > >> > > > > > >
> > >> > > > > > > On Wed, 7 Dec 2016 at 10:44 Michael Noll <
> > >> michael@confluent.io>
> > >> > > > wrote:
> > >> > > > > > >
> > >> > > > > > > > > There is no outer-join for GlobalKTables as the tables
> > >> may be
> > >> > > > keyed
> > >> > > > > > > > > differently. So you need to use the key from the left
> > >> side of
> > >> > > the
> > >> > > > > > join
> > >> > > > > > > > > along with the KeyValueMapper to resolve the right
> side
> > of
> > >> > the
> > >> > > > > join.
> > >> > > > > > > This
> > >> > > > > > > > > wont work the other way around.
> > >> > > > > > > >
> > >> > > > > > > > Care to elaborate why it won't work the other way
> around?
> > >> If,
> > >> > > for
> > >> > > > > > > example,
> > >> > > > > > > > we swapped the call from leftTable.join(rightTable) to
> > >> > > > > > > > rightTable.join(leftTable), that join would work, too.
> > >> > Perhaps I
> > >> > > > am
> > >> > > > > > > > missing something though. :-)
> > >> > > > > > > >
> > >> > > > > > > >
> > >> > > > > > > >
> > >> > > > > > > >
> > >> > > > > > > > On Wed, Dec 7, 2016 at 10:39 AM, Damian Guy <
> > >> > > damian.guy@gmail.com>
> > >> > > > > > > wrote:
> > >> > > > > > > >
> > >> > > > > > > > > Hi Matthias,
> > >> > > > > > > > >
> > >> > > > > > > > > Thanks for the feedback.
> > >> > > > > > > > >
> > >> > > > > > > > > There is no outer-join for GlobalKTables as the tables
> > >> may be
> > >> > > > keyed
> > >> > > > > > > > > differently. So you need to use the key from the left
> > >> side of
> > >> > > the
> > >> > > > > > join
> > >> > > > > > > > > along with the KeyValueMapper to resolve the right
> side
> > of
> > >> > the
> > >> > > > > join.
> > >> > > > > > > This
> > >> > > > > > > > > wont work the other way around.
> > >> > > > > > > > >
> > >> > > > > > > > > On the bootstrapping concern. If the application is
> > >> failing
> > >> > > > before
> > >> > > > > > > > > bootstrapping finishes, the problem is likely to be
> > >> related
> > >> > to
> > >> > > a
> > >> > > > > > > terminal
> > >> > > > > > > > > exception, i.e., running out of disk space, corrupt
> > state
> > >> > > stores
> > >> > > > > etc.
> > >> > > > > > > In
> > >> > > > > > > > > these cases, we wouldn't want the application to
> > continue.
> > >> > So i
> > >> > > > > think
> > >> > > > > > > > this
> > >> > > > > > > > > is ok.
> > >> > > > > > > > >
> > >> > > > > > > > > Thanks,
> > >> > > > > > > > > Damian
> > >> > > > > > > > >
> > >> > > > > > > > > On Tue, 6 Dec 2016 at 21:56 Matthias J. Sax <
> > >> > > > matthias@confluent.io
> > >> > > > > >
> > >> > > > > > > > wrote:
> > >> > > > > > > > >
> > >> > > > > > > > > > Thanks for the KIP Damian. Very nice motivating
> > example!
> > >> > > > > > > > > >
> > >> > > > > > > > > > A few comments:
> > >> > > > > > > > > >
> > >> > > > > > > > > >  - why is there no outer-join for GlobalKTables
> > >> > > > > > > > > >  - on bootstrapping GlobalKTable, could it happen
> that
> > >> this
> > >> > > > never
> > >> > > > > > > > > > finishes if the application fails before
> bootstrapping
> > >> > > finishes
> > >> > > > > and
> > >> > > > > > > new
> > >> > > > > > > > > > data gets written at the same time? Do we need to
> > guard
> > >> > > against
> > >> > > > > > this
> > >> > > > > > > > > > (seems to be a very rare corner case, so maybe not
> > >> > required)?
> > >> > > > > > > > > >
> > >> > > > > > > > > >
> > >> > > > > > > > > > -Matthias
> > >> > > > > > > > > >
> > >> > > > > > > > > >
> > >> > > > > > > > > > On 12/6/16 2:09 AM, Damian Guy wrote:
> > >> > > > > > > > > > > Hi all,
> > >> > > > > > > > > > >
> > >> > > > > > > > > > > I would like to start the discussion on KIP-99:
> > >> > > > > > > > > > >
> > >> > > > > > > > > > https://cwiki.apache.org/confluence/pages/viewpage.
> > >> > > > > > > > > action?pageId=67633649
> > >> > > > > > > > > > >
> > >> > > > > > > > > > > Looking forward to your feedback.
> > >> > > > > > > > > > >
> > >> > > > > > > > > > > Thanks,
> > >> > > > > > > > > > > Damian
> > >> > > > > > > > > > >
> > >> > > > > > > > > >
> > >> > > > > > > > > >
> > >> > > > > > > > >
> > >> > > > > > > >
> > >> > > > > > >
> > >> > > > > >
> > >> > > > > >
> > >> > > > > >
> > >> > > > > > --
> > >> > > > > > *Michael G. Noll*
> > >> > > > > > Product Manager | Confluent
> > >> > > > > > +1 650 453 5860 <(650)%20453-5860> <(650)%20453-5860>
> > <(650)%20453-5860>
> > >> > <(650)%20453-5860> | @miguno <
> > >> > > > https://twitter.com/miguno
> > >> > > > > >
> > >> > > > > > Follow us: Twitter <https://twitter.com/ConfluentInc> |
> Blog
> > >> > > > > > <http://www.confluent.io/blog>
> > >> > > > > >
> > >> > > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang

Re: [DISCUSS] KIP-99: Add Global Tables to Kafka Streams

Posted by Damian Guy <da...@gmail.com>.
Hi Guozhang,

Thanks for your input. Answers below, but i'm thinking we should remove
joins from GlobalKTables for the time being and re-visit if necessary in
the future.

1. with a global table the joins are never really materialized (at least
how i see it), rather they are just views on the existing global tables.
I've deliberately taken this approach so we don't have to create yet
another State Store and changelog topic etc. These all consume resources
that i believe are unnecessary. So, i don't really see the point of having
a materialize method. Further, one of the major benefits of joining two
global tables is being able to query them via Interactive Queries. For this
you need the name, so i think it makes sense to provide it with the join.

2. This has been discussed already in this thread (with Michael), and
outerJoin is deliberately not part of the KIP. To be able to join both
ways, as you suggest, requires that both inputs are able to map to the same
key. This is not always going to be possible, i.e., relationships can be
one way, so for that reason i felt it was best to not go down that path as
we'd not be able to resolve it at the time that
globalTable.join(otherGlobalTable,...) was called, and this would result in
possible confusion. Also, to support this we'd need to physically
materialize a StateStore that represents the join (which i think is a waste
of resources), or, we'd need to provide another interface where we can map
from the key of the resulting global table to the keys of both of the
joined tables.

3. The intention is that the GlobalKTables are in a single topology that is
owned and updated by a single thread. So yes it is necessary that they can
be created separately.

4. Bootstrapping and maintaining of the state of GlobalKTables are done on
a single thread. This thread will run simultaneously with the current
StreamThreads. It doesn't make sense to move the bootstrapping of the
StandbyTasks to this thread as they are logically part of a StreamThread,
they are 'assigned' to the StreamThread. With GlobalKTables there is no
assignment as such, the thread just maintains all of them.

5. Yes i'll update the KIP - the state directory will be under the same
path as StreamsConfig.STATE_DIR_CONFIG, but it will be a specific
directory, i.e, global_state, rather then being a task directory.

6. The whole point of GlobalKTables is to have a copy of ALL of the data on
each node. I don't think it makes sense to be able to reset the starting
position.

Thanks,
Damian

On Tue, 20 Dec 2016 at 20:00 Guozhang Wang <wa...@gmail.com> wrote:

> One more thing to add:
>
> 6. For KGlobalTable, it is always bootstrapped from the beginning while for
> other KTables, we are enabling users to override their resetting position
> as in
>
> https://github.com/apache/kafka/pull/2007
>
> Should we consider doing the same for KGlobalTable as well?
>
>
> Guozhang
>
>
> On Tue, Dec 20, 2016 at 11:39 AM, Guozhang Wang <wa...@gmail.com>
> wrote:
>
> > Thanks for the very well written proposal, and sorry for the very-late
> > review. I have a few comments here:
> >
> > 1. We are introducing a "queryableViewName" in the GlobalTable join
> > results, while I'm wondering if we should just add a more general
> function
> > like "materialize" to KTable and KGlobalTable with the name to be used in
> > queries?
> >
> > 2. For KGlobalTable's own "join" and "leftJoin": since we are only
> passing
> > the KeyValueMapper<K, V, K1> keyMapper it seems that for either case only
> > the left hand side will logically "trigger" the join, which is different
> to
> > KTable's join semantics. I'm wondering if it would be more consistent to
> > have them as:
> >
> >
> > <K1, V1, R> GlobalKTable<K, R> join(final GlobalKTable<K1, V1> other,
> >                                         final KeyValueMapper<K, V, K1>
> > leftkeyMapper,
> >                                         final KeyValueMapper<K1, V1, K>
> > rightkeyMapper,
> >                                         final ValueJoiner<V, V1, R>
> joiner
> >                                         final String queryableViewName);
> >
> > <K1, V1, R> GlobalKTable<K, R> outerJoin(final GlobalKTable<K1, V1>
> other,
> >                                          final KeyValueMapper<K, V, K1>
> > leftkeyMapper,
> >                                          final KeyValueMapper<K1, V1, K>
> > rightkeyMapper,
> >                                          final ValueJoiner<V, V1, R>
> > joiner,
> >                                          final String queryableViewName);
> >
> > <K1, V1, R> GlobalKTable<K, R> leftJoin(final GlobalKTable<K1, V1> other,
> >                                         final KeyValueMapper<K, V, K1>
> > keyMapper,
> >                                         final ValueJoiner<V, V1, R>
> > joiner,
> >                                         final String queryableViewName);
> >
> >
> > I.e. add another directional key mapper to join and also to outerJoin.
> >
> >
> > 3. For "TopologyBuilder.buildGlobalStateTopology", is it necessary to
> > have a separate function from "TopologyBuilder.build" itself? With global
> > tables, is there any scenarios that we want to build the topology without
> > the embedded global tables (i.e. still calling "build")?
> >
> > 4. As for implementation, you mentioned that global table bootstraping
> > will be done in another dedicated thread. Could we also consider moving
> the
> > logic of bootstrapping the standby-replica state stores into this thread
> as
> > well, which can then leverage on the existing "restoreConsumer" that does
> > not participate in the consumer group protocol? By doing this I think we
> > can still avoid thread-synchronization while making the logic more clear
> > (ideally the standby restoration do not really need to be in part of the
> > stream thread's main loops).
> >
> > 5. Also for the global table's state directory, I'm assuming it will not
> > be under the per-task directory as it is per instance. But could you
> > elaborate a bit in the wiki about its directory as well? Also could we
> > consider adding https://issues.apache.org/jira/browse/KAFKA-3522 along
> > with this feature since we may need to change the directory path /
> storage
> > schema formats for these different types of stores moving forward.
> >
> >
> >
> > Guozhang
> >
> >
> > On Fri, Dec 9, 2016 at 4:21 AM, Damian Guy <da...@gmail.com> wrote:
> >
> >> Thanks for the update Michael.
> >>
> >> I just wanted to add that there is one crucial piece of information that
> >> i've failed to add (I apologise).
> >>
> >> To me, the join between 2 Global Tables just produces a view on top of
> the
> >> underlying tables (this is the same as it works for KTables today). So
> >> that
> >> means there is no Physical StateStore that backs the join result, it is
> >> just a Virtual StateStore that knows how to resolve the join when it is
> >> required. I've deliberately taken this path so that we don't end up
> having
> >> yet another copy of the data, stored on local disk, and sent to another
> >> change-log topic. This also reduces the memory overhead from creating
> >> RocksDBStores and reduces load on the Thread based caches we have. So it
> >> is
> >> a resource optimization.
> >>
> >> So while it is technically possible to support outer joins, we would
> need
> >> to physically materialize the StateStore (and create a changelog-topic
> for
> >> it), or, we'd need to provide another interface where the user could map
> >> from the outerJoin key to both of the other table keys. This is because
> >> the
> >> key of the outerJoin table could be either the key of the lhs table, or
> >> the
> >> rhs tables, or something completely different.
> >>
> >> With this and what you have mentioned above in mind i think we should
> park
> >> outerJoin support for this KIP and re-visit if and when we need it in
> the
> >> future.
> >>
> >> I'll update the KIP with this.
> >>
> >> Thanks,
> >> Damian
> >>
> >> On Fri, 9 Dec 2016 at 09:53 Michael Noll <mi...@confluent.io> wrote:
> >>
> >> > Damian and I briefly chatted offline (thanks, Damian!), and here's the
> >> > summary of my thoughts and conclusion.
> >> >
> >> > TL;DR: Let's skip outer join support for global tables.
> >> >
> >> > In more detail:
> >> >
> >> > - We agreed that, technically, we can add OUTER JOIN support.
> However,
> >> > outer joins only work if certain preconditions are met.  The
> >> preconditions
> >> > are IMHO simliar/the same as we have for the normal, partitioned
> KTables
> >> > (e.g. having matching keys and co-partitioned data for the tables),
> but
> >> in
> >> > the case of global tables the user would need to meet all these
> >> > preconditions in one big swing when specifying the params for the
> outer
> >> > join call.  Even so, you'd only know at run-time whether the
> >> preconditions
> >> > were actually met properly.
> >> >
> >> > - Hence it's quite likely that users will be confused about these
> >> > preconditions and how to meet them, and -- from what we can tell --
> use
> >> > cases / user demand for outer joins have been rare.
> >> >
> >> > - So, long story short, even though we could add outer join support
> we'd
> >> > suggest to skip it for global tables.  If we subsequently learn that
> is
> >> a
> >> > lot of user interest in that functionality, we still have the option
> to
> >> add
> >> > it in the future.
> >> >
> >> >
> >> > Best,
> >> > Michael
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> > On Thu, Dec 8, 2016 at 6:31 PM, Damian Guy <da...@gmail.com>
> >> wrote:
> >> >
> >> > > Hi Michael,
> >> > >
> >> > > I don't see how that helps?
> >> > >
> >> > > Lets say we have tables Person(id, device_id, name, ...), Device(id,
> >> > > person_id, type, ...), and both are keyed with same type. And we
> have
> >> a
> >> > > stream, that for the sake of simplicity, has both person_id and
> >> > device_id (
> >> > > i know this is a bit contrived!)
> >> > > so our join
> >> > > person = builder.globalTable(...);
> >> > > device = builder.globalTable(...);
> >> > > personDevice = builder.outerJoin(device, ...);
> >> > >
> >> > > someStream = builder.stream(..);
> >> > > // which id do i use to join with? person.id? device.id?
> >> > > someStream.leftJoin(personDevice, ...)
> >> > >
> >> > > // Interactive Query on the view generated by the join of person and
> >> > device
> >> > > personDeviceStore = streams.store("personDevice",...);
> >> > > // person.id? device.id?
> >> > > personDeviceStore.get(someId);
> >> > >
> >> > > We get records
> >> > > person id=1, device_id=2 ,...
> >> > > device id=2, person_id=1, ...
> >> > > stream person_id = 1, device_id = 2
> >> > >
> >> > > We could do the join between the GlobalTables both ways as each side
> >> > could
> >> > > map to the other sides key, but when i'm accessing the resulting
> >> table,
> >> > > personDevice, what is the key? The person.id ? the device.id? it
> >> can't
> >> > be
> >> > > both of them.
> >> > >
> >> > > Thanks,
> >> > > Damian
> >> > >
> >> > >
> >> > >
> >> > >
> >> > > On Thu, 8 Dec 2016 at 15:51 Michael Noll <mi...@confluent.io>
> >> wrote:
> >> > >
> >> > > > The key type returned by both KeyValueMappers (in the current
> trunk
> >> > > > version, that type is named `R`) would need to be the same for
> this
> >> to
> >> > > > work.
> >> > > >
> >> > > >
> >> > > > On Wed, Dec 7, 2016 at 4:46 PM, Damian Guy <da...@gmail.com>
> >> > wrote:
> >> > > >
> >> > > > > Michael,
> >> > > > >
> >> > > > > We can only support outerJoin if both tables are keyed the same
> >> way.
> >> > > Lets
> >> > > > > say for example you can map both ways, however, the key for each
> >> > table
> >> > > is
> >> > > > > of a different type. So t1 is long and t2 is string - what is
> the
> >> key
> >> > > > type
> >> > > > > of the resulting GlobalKTable? So when you subsequently join to
> >> this
> >> > > > table,
> >> > > > > and do a lookup on it, which key are you using?
> >> > > > >
> >> > > > > Thanks,
> >> > > > > Damian
> >> > > > >
> >> > > > > On Wed, 7 Dec 2016 at 14:31 Michael Noll <mi...@confluent.io>
> >> > wrote:
> >> > > > >
> >> > > > > > Damian,
> >> > > > > >
> >> > > > > > yes, that makes sense.
> >> > > > > >
> >> > > > > > But I am still wondering:  In your example, there's no prior
> >> > > knowledge
> >> > > > > "can
> >> > > > > > I map from t1->t2" that Streams can leverage for joining t1
> and
> >> t2
> >> > > > other
> >> > > > > > than blindly relying on the user to provide an appropriate
> >> > > > KeyValueMapper
> >> > > > > > for K1/V1 of t1 -> K2/V2 of t2.  In other words, if we allow
> the
> >> > user
> >> > > > to
> >> > > > > > provide a KeyValueMapper from t1->t2 (Streams does not know at
> >> > > compile
> >> > > > > time
> >> > > > > > whether this mapping will actually work), then we can also
> allow
> >> > the
> >> > > > user
> >> > > > > > to provide a corresponding "reverse" mapper from t2->t1.  That
> >> is,
> >> > we
> >> > > > > could
> >> > > > > > say that an outer join between two global tables IS supported,
> >> but
> >> > if
> >> > > > and
> >> > > > > > only if the user provides two KeyValueMappers, one for t1->t2
> >> and
> >> > one
> >> > > > for
> >> > > > > > t2->t1.
> >> > > > > >
> >> > > > > > The left join t1->t2 (which is supported in the KIP), in
> >> general,
> >> > > works
> >> > > > > > only because of the existence of the user-provided
> >> KeyValueMapper
> >> > > from
> >> > > > > > t1->t2.  The outer join, as you point out, cannot satisfied as
> >> > easily
> >> > > > > > because Streams must know two mappers, t1->t2 plus t2->t1 --
> >> > > otherwise
> >> > > > > the
> >> > > > > > outer join won't work.
> >> > > > > >
> >> > > > > >
> >> > > > > >
> >> > > > > >
> >> > > > > >
> >> > > > > > On Wed, Dec 7, 2016 at 3:04 PM, Damian Guy <
> >> damian.guy@gmail.com>
> >> > > > wrote:
> >> > > > > >
> >> > > > > > > Hi Michael,
> >> > > > > > >
> >> > > > > > > Sure. Say we have 2 input topics t1 & t2 below:
> >> > > > > > > t1{
> >> > > > > > >  int key;
> >> > > > > > >  string t2_id;
> >> > > > > > >  ...
> >> > > > > > > }
> >> > > > > > >
> >> > > > > > > t2 {
> >> > > > > > >   string key;
> >> > > > > > >   ..
> >> > > > > > > }
> >> > > > > > > If we create global tables out of these we'd get:
> >> > > > > > > GlobalKTable<Integer, ?> t1;
> >> > > > > > > GlobalKTable<String, ?> t2;
> >> > > > > > >
> >> > > > > > > So the join can only go in 1 direction, i.e, from t1 -> t2
> as
> >> in
> >> > > > order
> >> > > > > to
> >> > > > > > > perform the join we need to use a KeyValueMapper to extract
> >> the
> >> > t2
> >> > > > key
> >> > > > > > from
> >> > > > > > > the t1 value.
> >> > > > > > >
> >> > > > > > > Does that make sense?
> >> > > > > > >
> >> > > > > > > Thanks,
> >> > > > > > > Damian
> >> > > > > > >
> >> > > > > > > On Wed, 7 Dec 2016 at 10:44 Michael Noll <
> >> michael@confluent.io>
> >> > > > wrote:
> >> > > > > > >
> >> > > > > > > > > There is no outer-join for GlobalKTables as the tables
> >> may be
> >> > > > keyed
> >> > > > > > > > > differently. So you need to use the key from the left
> >> side of
> >> > > the
> >> > > > > > join
> >> > > > > > > > > along with the KeyValueMapper to resolve the right side
> of
> >> > the
> >> > > > > join.
> >> > > > > > > This
> >> > > > > > > > > wont work the other way around.
> >> > > > > > > >
> >> > > > > > > > Care to elaborate why it won't work the other way around?
> >> If,
> >> > > for
> >> > > > > > > example,
> >> > > > > > > > we swapped the call from leftTable.join(rightTable) to
> >> > > > > > > > rightTable.join(leftTable), that join would work, too.
> >> > Perhaps I
> >> > > > am
> >> > > > > > > > missing something though. :-)
> >> > > > > > > >
> >> > > > > > > >
> >> > > > > > > >
> >> > > > > > > >
> >> > > > > > > > On Wed, Dec 7, 2016 at 10:39 AM, Damian Guy <
> >> > > damian.guy@gmail.com>
> >> > > > > > > wrote:
> >> > > > > > > >
> >> > > > > > > > > Hi Matthias,
> >> > > > > > > > >
> >> > > > > > > > > Thanks for the feedback.
> >> > > > > > > > >
> >> > > > > > > > > There is no outer-join for GlobalKTables as the tables
> >> may be
> >> > > > keyed
> >> > > > > > > > > differently. So you need to use the key from the left
> >> side of
> >> > > the
> >> > > > > > join
> >> > > > > > > > > along with the KeyValueMapper to resolve the right side
> of
> >> > the
> >> > > > > join.
> >> > > > > > > This
> >> > > > > > > > > wont work the other way around.
> >> > > > > > > > >
> >> > > > > > > > > On the bootstrapping concern. If the application is
> >> failing
> >> > > > before
> >> > > > > > > > > bootstrapping finishes, the problem is likely to be
> >> related
> >> > to
> >> > > a
> >> > > > > > > terminal
> >> > > > > > > > > exception, i.e., running out of disk space, corrupt
> state
> >> > > stores
> >> > > > > etc.
> >> > > > > > > In
> >> > > > > > > > > these cases, we wouldn't want the application to
> continue.
> >> > So i
> >> > > > > think
> >> > > > > > > > this
> >> > > > > > > > > is ok.
> >> > > > > > > > >
> >> > > > > > > > > Thanks,
> >> > > > > > > > > Damian
> >> > > > > > > > >
> >> > > > > > > > > On Tue, 6 Dec 2016 at 21:56 Matthias J. Sax <
> >> > > > matthias@confluent.io
> >> > > > > >
> >> > > > > > > > wrote:
> >> > > > > > > > >
> >> > > > > > > > > > Thanks for the KIP Damian. Very nice motivating
> example!
> >> > > > > > > > > >
> >> > > > > > > > > > A few comments:
> >> > > > > > > > > >
> >> > > > > > > > > >  - why is there no outer-join for GlobalKTables
> >> > > > > > > > > >  - on bootstrapping GlobalKTable, could it happen that
> >> this
> >> > > > never
> >> > > > > > > > > > finishes if the application fails before bootstrapping
> >> > > finishes
> >> > > > > and
> >> > > > > > > new
> >> > > > > > > > > > data gets written at the same time? Do we need to
> guard
> >> > > against
> >> > > > > > this
> >> > > > > > > > > > (seems to be a very rare corner case, so maybe not
> >> > required)?
> >> > > > > > > > > >
> >> > > > > > > > > >
> >> > > > > > > > > > -Matthias
> >> > > > > > > > > >
> >> > > > > > > > > >
> >> > > > > > > > > > On 12/6/16 2:09 AM, Damian Guy wrote:
> >> > > > > > > > > > > Hi all,
> >> > > > > > > > > > >
> >> > > > > > > > > > > I would like to start the discussion on KIP-99:
> >> > > > > > > > > > >
> >> > > > > > > > > > https://cwiki.apache.org/confluence/pages/viewpage.
> >> > > > > > > > > action?pageId=67633649
> >> > > > > > > > > > >
> >> > > > > > > > > > > Looking forward to your feedback.
> >> > > > > > > > > > >
> >> > > > > > > > > > > Thanks,
> >> > > > > > > > > > > Damian
> >> > > > > > > > > > >
> >> > > > > > > > > >
> >> > > > > > > > > >
> >> > > > > > > > >
> >> > > > > > > >
> >> > > > > > >
> >> > > > > >
> >> > > > > >
> >> > > > > >
> >> > > > > > --
> >> > > > > > *Michael G. Noll*
> >> > > > > > Product Manager | Confluent
> >> > > > > > +1 650 453 5860 <(650)%20453-5860> <(650)%20453-5860>
> <(650)%20453-5860>
> >> > <(650)%20453-5860> | @miguno <
> >> > > > https://twitter.com/miguno
> >> > > > > >
> >> > > > > > Follow us: Twitter <https://twitter.com/ConfluentInc> | Blog
> >> > > > > > <http://www.confluent.io/blog>
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> >
> >
> >
> > --
> > -- Guozhang
> >
>
>
>
> --
> -- Guozhang
>

Re: [DISCUSS] KIP-99: Add Global Tables to Kafka Streams

Posted by Guozhang Wang <wa...@gmail.com>.
One more thing to add:

6. For KGlobalTable, it is always bootstrapped from the beginning while for
other KTables, we are enabling users to override their resetting position
as in

https://github.com/apache/kafka/pull/2007

Should we consider doing the same for KGlobalTable as well?


Guozhang


On Tue, Dec 20, 2016 at 11:39 AM, Guozhang Wang <wa...@gmail.com> wrote:

> Thanks for the very well written proposal, and sorry for the very-late
> review. I have a few comments here:
>
> 1. We are introducing a "queryableViewName" in the GlobalTable join
> results, while I'm wondering if we should just add a more general function
> like "materialize" to KTable and KGlobalTable with the name to be used in
> queries?
>
> 2. For KGlobalTable's own "join" and "leftJoin": since we are only passing
> the KeyValueMapper<K, V, K1> keyMapper it seems that for either case only
> the left hand side will logically "trigger" the join, which is different to
> KTable's join semantics. I'm wondering if it would be more consistent to
> have them as:
>
>
> <K1, V1, R> GlobalKTable<K, R> join(final GlobalKTable<K1, V1> other,
>                                         final KeyValueMapper<K, V, K1>
> leftkeyMapper,
>                                         final KeyValueMapper<K1, V1, K>
> rightkeyMapper,
>                                         final ValueJoiner<V, V1, R> joiner
>                                         final String queryableViewName);
>
> <K1, V1, R> GlobalKTable<K, R> outerJoin(final GlobalKTable<K1, V1> other,
>                                          final KeyValueMapper<K, V, K1>
> leftkeyMapper,
>                                          final KeyValueMapper<K1, V1, K>
> rightkeyMapper,
>                                          final ValueJoiner<V, V1, R>
> joiner,
>                                          final String queryableViewName);
>
> <K1, V1, R> GlobalKTable<K, R> leftJoin(final GlobalKTable<K1, V1> other,
>                                         final KeyValueMapper<K, V, K1>
> keyMapper,
>                                         final ValueJoiner<V, V1, R>
> joiner,
>                                         final String queryableViewName);
>
>
> I.e. add another directional key mapper to join and also to outerJoin.
>
>
> 3. For "TopologyBuilder.buildGlobalStateTopology", is it necessary to
> have a separate function from "TopologyBuilder.build" itself? With global
> tables, is there any scenarios that we want to build the topology without
> the embedded global tables (i.e. still calling "build")?
>
> 4. As for implementation, you mentioned that global table bootstraping
> will be done in another dedicated thread. Could we also consider moving the
> logic of bootstrapping the standby-replica state stores into this thread as
> well, which can then leverage on the existing "restoreConsumer" that does
> not participate in the consumer group protocol? By doing this I think we
> can still avoid thread-synchronization while making the logic more clear
> (ideally the standby restoration do not really need to be in part of the
> stream thread's main loops).
>
> 5. Also for the global table's state directory, I'm assuming it will not
> be under the per-task directory as it is per instance. But could you
> elaborate a bit in the wiki about its directory as well? Also could we
> consider adding https://issues.apache.org/jira/browse/KAFKA-3522 along
> with this feature since we may need to change the directory path / storage
> schema formats for these different types of stores moving forward.
>
>
>
> Guozhang
>
>
> On Fri, Dec 9, 2016 at 4:21 AM, Damian Guy <da...@gmail.com> wrote:
>
>> Thanks for the update Michael.
>>
>> I just wanted to add that there is one crucial piece of information that
>> i've failed to add (I apologise).
>>
>> To me, the join between 2 Global Tables just produces a view on top of the
>> underlying tables (this is the same as it works for KTables today). So
>> that
>> means there is no Physical StateStore that backs the join result, it is
>> just a Virtual StateStore that knows how to resolve the join when it is
>> required. I've deliberately taken this path so that we don't end up having
>> yet another copy of the data, stored on local disk, and sent to another
>> change-log topic. This also reduces the memory overhead from creating
>> RocksDBStores and reduces load on the Thread based caches we have. So it
>> is
>> a resource optimization.
>>
>> So while it is technically possible to support outer joins, we would need
>> to physically materialize the StateStore (and create a changelog-topic for
>> it), or, we'd need to provide another interface where the user could map
>> from the outerJoin key to both of the other table keys. This is because
>> the
>> key of the outerJoin table could be either the key of the lhs table, or
>> the
>> rhs tables, or something completely different.
>>
>> With this and what you have mentioned above in mind i think we should park
>> outerJoin support for this KIP and re-visit if and when we need it in the
>> future.
>>
>> I'll update the KIP with this.
>>
>> Thanks,
>> Damian
>>
>> On Fri, 9 Dec 2016 at 09:53 Michael Noll <mi...@confluent.io> wrote:
>>
>> > Damian and I briefly chatted offline (thanks, Damian!), and here's the
>> > summary of my thoughts and conclusion.
>> >
>> > TL;DR: Let's skip outer join support for global tables.
>> >
>> > In more detail:
>> >
>> > - We agreed that, technically, we can add OUTER JOIN support.  However,
>> > outer joins only work if certain preconditions are met.  The
>> preconditions
>> > are IMHO simliar/the same as we have for the normal, partitioned KTables
>> > (e.g. having matching keys and co-partitioned data for the tables), but
>> in
>> > the case of global tables the user would need to meet all these
>> > preconditions in one big swing when specifying the params for the outer
>> > join call.  Even so, you'd only know at run-time whether the
>> preconditions
>> > were actually met properly.
>> >
>> > - Hence it's quite likely that users will be confused about these
>> > preconditions and how to meet them, and -- from what we can tell -- use
>> > cases / user demand for outer joins have been rare.
>> >
>> > - So, long story short, even though we could add outer join support we'd
>> > suggest to skip it for global tables.  If we subsequently learn that is
>> a
>> > lot of user interest in that functionality, we still have the option to
>> add
>> > it in the future.
>> >
>> >
>> > Best,
>> > Michael
>> >
>> >
>> >
>> >
>> >
>> >
>> > On Thu, Dec 8, 2016 at 6:31 PM, Damian Guy <da...@gmail.com>
>> wrote:
>> >
>> > > Hi Michael,
>> > >
>> > > I don't see how that helps?
>> > >
>> > > Lets say we have tables Person(id, device_id, name, ...), Device(id,
>> > > person_id, type, ...), and both are keyed with same type. And we have
>> a
>> > > stream, that for the sake of simplicity, has both person_id and
>> > device_id (
>> > > i know this is a bit contrived!)
>> > > so our join
>> > > person = builder.globalTable(...);
>> > > device = builder.globalTable(...);
>> > > personDevice = builder.outerJoin(device, ...);
>> > >
>> > > someStream = builder.stream(..);
>> > > // which id do i use to join with? person.id? device.id?
>> > > someStream.leftJoin(personDevice, ...)
>> > >
>> > > // Interactive Query on the view generated by the join of person and
>> > device
>> > > personDeviceStore = streams.store("personDevice",...);
>> > > // person.id? device.id?
>> > > personDeviceStore.get(someId);
>> > >
>> > > We get records
>> > > person id=1, device_id=2 ,...
>> > > device id=2, person_id=1, ...
>> > > stream person_id = 1, device_id = 2
>> > >
>> > > We could do the join between the GlobalTables both ways as each side
>> > could
>> > > map to the other sides key, but when i'm accessing the resulting
>> table,
>> > > personDevice, what is the key? The person.id ? the device.id? it
>> can't
>> > be
>> > > both of them.
>> > >
>> > > Thanks,
>> > > Damian
>> > >
>> > >
>> > >
>> > >
>> > > On Thu, 8 Dec 2016 at 15:51 Michael Noll <mi...@confluent.io>
>> wrote:
>> > >
>> > > > The key type returned by both KeyValueMappers (in the current trunk
>> > > > version, that type is named `R`) would need to be the same for this
>> to
>> > > > work.
>> > > >
>> > > >
>> > > > On Wed, Dec 7, 2016 at 4:46 PM, Damian Guy <da...@gmail.com>
>> > wrote:
>> > > >
>> > > > > Michael,
>> > > > >
>> > > > > We can only support outerJoin if both tables are keyed the same
>> way.
>> > > Lets
>> > > > > say for example you can map both ways, however, the key for each
>> > table
>> > > is
>> > > > > of a different type. So t1 is long and t2 is string - what is the
>> key
>> > > > type
>> > > > > of the resulting GlobalKTable? So when you subsequently join to
>> this
>> > > > table,
>> > > > > and do a lookup on it, which key are you using?
>> > > > >
>> > > > > Thanks,
>> > > > > Damian
>> > > > >
>> > > > > On Wed, 7 Dec 2016 at 14:31 Michael Noll <mi...@confluent.io>
>> > wrote:
>> > > > >
>> > > > > > Damian,
>> > > > > >
>> > > > > > yes, that makes sense.
>> > > > > >
>> > > > > > But I am still wondering:  In your example, there's no prior
>> > > knowledge
>> > > > > "can
>> > > > > > I map from t1->t2" that Streams can leverage for joining t1 and
>> t2
>> > > > other
>> > > > > > than blindly relying on the user to provide an appropriate
>> > > > KeyValueMapper
>> > > > > > for K1/V1 of t1 -> K2/V2 of t2.  In other words, if we allow the
>> > user
>> > > > to
>> > > > > > provide a KeyValueMapper from t1->t2 (Streams does not know at
>> > > compile
>> > > > > time
>> > > > > > whether this mapping will actually work), then we can also allow
>> > the
>> > > > user
>> > > > > > to provide a corresponding "reverse" mapper from t2->t1.  That
>> is,
>> > we
>> > > > > could
>> > > > > > say that an outer join between two global tables IS supported,
>> but
>> > if
>> > > > and
>> > > > > > only if the user provides two KeyValueMappers, one for t1->t2
>> and
>> > one
>> > > > for
>> > > > > > t2->t1.
>> > > > > >
>> > > > > > The left join t1->t2 (which is supported in the KIP), in
>> general,
>> > > works
>> > > > > > only because of the existence of the user-provided
>> KeyValueMapper
>> > > from
>> > > > > > t1->t2.  The outer join, as you point out, cannot satisfied as
>> > easily
>> > > > > > because Streams must know two mappers, t1->t2 plus t2->t1 --
>> > > otherwise
>> > > > > the
>> > > > > > outer join won't work.
>> > > > > >
>> > > > > >
>> > > > > >
>> > > > > >
>> > > > > >
>> > > > > > On Wed, Dec 7, 2016 at 3:04 PM, Damian Guy <
>> damian.guy@gmail.com>
>> > > > wrote:
>> > > > > >
>> > > > > > > Hi Michael,
>> > > > > > >
>> > > > > > > Sure. Say we have 2 input topics t1 & t2 below:
>> > > > > > > t1{
>> > > > > > >  int key;
>> > > > > > >  string t2_id;
>> > > > > > >  ...
>> > > > > > > }
>> > > > > > >
>> > > > > > > t2 {
>> > > > > > >   string key;
>> > > > > > >   ..
>> > > > > > > }
>> > > > > > > If we create global tables out of these we'd get:
>> > > > > > > GlobalKTable<Integer, ?> t1;
>> > > > > > > GlobalKTable<String, ?> t2;
>> > > > > > >
>> > > > > > > So the join can only go in 1 direction, i.e, from t1 -> t2 as
>> in
>> > > > order
>> > > > > to
>> > > > > > > perform the join we need to use a KeyValueMapper to extract
>> the
>> > t2
>> > > > key
>> > > > > > from
>> > > > > > > the t1 value.
>> > > > > > >
>> > > > > > > Does that make sense?
>> > > > > > >
>> > > > > > > Thanks,
>> > > > > > > Damian
>> > > > > > >
>> > > > > > > On Wed, 7 Dec 2016 at 10:44 Michael Noll <
>> michael@confluent.io>
>> > > > wrote:
>> > > > > > >
>> > > > > > > > > There is no outer-join for GlobalKTables as the tables
>> may be
>> > > > keyed
>> > > > > > > > > differently. So you need to use the key from the left
>> side of
>> > > the
>> > > > > > join
>> > > > > > > > > along with the KeyValueMapper to resolve the right side of
>> > the
>> > > > > join.
>> > > > > > > This
>> > > > > > > > > wont work the other way around.
>> > > > > > > >
>> > > > > > > > Care to elaborate why it won't work the other way around?
>> If,
>> > > for
>> > > > > > > example,
>> > > > > > > > we swapped the call from leftTable.join(rightTable) to
>> > > > > > > > rightTable.join(leftTable), that join would work, too.
>> > Perhaps I
>> > > > am
>> > > > > > > > missing something though. :-)
>> > > > > > > >
>> > > > > > > >
>> > > > > > > >
>> > > > > > > >
>> > > > > > > > On Wed, Dec 7, 2016 at 10:39 AM, Damian Guy <
>> > > damian.guy@gmail.com>
>> > > > > > > wrote:
>> > > > > > > >
>> > > > > > > > > Hi Matthias,
>> > > > > > > > >
>> > > > > > > > > Thanks for the feedback.
>> > > > > > > > >
>> > > > > > > > > There is no outer-join for GlobalKTables as the tables
>> may be
>> > > > keyed
>> > > > > > > > > differently. So you need to use the key from the left
>> side of
>> > > the
>> > > > > > join
>> > > > > > > > > along with the KeyValueMapper to resolve the right side of
>> > the
>> > > > > join.
>> > > > > > > This
>> > > > > > > > > wont work the other way around.
>> > > > > > > > >
>> > > > > > > > > On the bootstrapping concern. If the application is
>> failing
>> > > > before
>> > > > > > > > > bootstrapping finishes, the problem is likely to be
>> related
>> > to
>> > > a
>> > > > > > > terminal
>> > > > > > > > > exception, i.e., running out of disk space, corrupt state
>> > > stores
>> > > > > etc.
>> > > > > > > In
>> > > > > > > > > these cases, we wouldn't want the application to continue.
>> > So i
>> > > > > think
>> > > > > > > > this
>> > > > > > > > > is ok.
>> > > > > > > > >
>> > > > > > > > > Thanks,
>> > > > > > > > > Damian
>> > > > > > > > >
>> > > > > > > > > On Tue, 6 Dec 2016 at 21:56 Matthias J. Sax <
>> > > > matthias@confluent.io
>> > > > > >
>> > > > > > > > wrote:
>> > > > > > > > >
>> > > > > > > > > > Thanks for the KIP Damian. Very nice motivating example!
>> > > > > > > > > >
>> > > > > > > > > > A few comments:
>> > > > > > > > > >
>> > > > > > > > > >  - why is there no outer-join for GlobalKTables
>> > > > > > > > > >  - on bootstrapping GlobalKTable, could it happen that
>> this
>> > > > never
>> > > > > > > > > > finishes if the application fails before bootstrapping
>> > > finishes
>> > > > > and
>> > > > > > > new
>> > > > > > > > > > data gets written at the same time? Do we need to guard
>> > > against
>> > > > > > this
>> > > > > > > > > > (seems to be a very rare corner case, so maybe not
>> > required)?
>> > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > > > -Matthias
>> > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > > > On 12/6/16 2:09 AM, Damian Guy wrote:
>> > > > > > > > > > > Hi all,
>> > > > > > > > > > >
>> > > > > > > > > > > I would like to start the discussion on KIP-99:
>> > > > > > > > > > >
>> > > > > > > > > > https://cwiki.apache.org/confluence/pages/viewpage.
>> > > > > > > > > action?pageId=67633649
>> > > > > > > > > > >
>> > > > > > > > > > > Looking forward to your feedback.
>> > > > > > > > > > >
>> > > > > > > > > > > Thanks,
>> > > > > > > > > > > Damian
>> > > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > > >
>> > > > > >
>> > > > > > --
>> > > > > > *Michael G. Noll*
>> > > > > > Product Manager | Confluent
>> > > > > > +1 650 453 5860 <(650)%20453-5860> <(650)%20453-5860>
>> > <(650)%20453-5860> | @miguno <
>> > > > https://twitter.com/miguno
>> > > > > >
>> > > > > > Follow us: Twitter <https://twitter.com/ConfluentInc> | Blog
>> > > > > > <http://www.confluent.io/blog>
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>>
>
>
>
> --
> -- Guozhang
>



-- 
-- Guozhang

Re: [DISCUSS] KIP-99: Add Global Tables to Kafka Streams

Posted by Guozhang Wang <wa...@gmail.com>.
Thanks for the very well written proposal, and sorry for the very-late
review. I have a few comments here:

1. We are introducing a "queryableViewName" in the GlobalTable join
results, while I'm wondering if we should just add a more general function
like "materialize" to KTable and KGlobalTable with the name to be used in
queries?

2. For KGlobalTable's own "join" and "leftJoin": since we are only passing
the KeyValueMapper<K, V, K1> keyMapper it seems that for either case only
the left hand side will logically "trigger" the join, which is different to
KTable's join semantics. I'm wondering if it would be more consistent to
have them as:


<K1, V1, R> GlobalKTable<K, R> join(final GlobalKTable<K1, V1> other,
                                        final KeyValueMapper<K, V, K1>
leftkeyMapper,
                                        final KeyValueMapper<K1, V1, K>
rightkeyMapper,
                                        final ValueJoiner<V, V1, R> joiner
                                        final String queryableViewName);

<K1, V1, R> GlobalKTable<K, R> outerJoin(final GlobalKTable<K1, V1> other,
                                         final KeyValueMapper<K, V, K1>
leftkeyMapper,
                                         final KeyValueMapper<K1, V1, K>
rightkeyMapper,
                                         final ValueJoiner<V, V1, R> joiner,
                                         final String queryableViewName);

<K1, V1, R> GlobalKTable<K, R> leftJoin(final GlobalKTable<K1, V1> other,
                                        final KeyValueMapper<K, V, K1>
keyMapper,
                                        final ValueJoiner<V, V1, R> joiner,
                                        final String queryableViewName);


I.e. add another directional key mapper to join and also to outerJoin.


3. For "TopologyBuilder.buildGlobalStateTopology", is it necessary to have
a separate function from "TopologyBuilder.build" itself? With global
tables, is there any scenarios that we want to build the topology without
the embedded global tables (i.e. still calling "build")?

4. As for implementation, you mentioned that global table bootstraping will
be done in another dedicated thread. Could we also consider moving the
logic of bootstrapping the standby-replica state stores into this thread as
well, which can then leverage on the existing "restoreConsumer" that does
not participate in the consumer group protocol? By doing this I think we
can still avoid thread-synchronization while making the logic more clear
(ideally the standby restoration do not really need to be in part of the
stream thread's main loops).

5. Also for the global table's state directory, I'm assuming it will not be
under the per-task directory as it is per instance. But could you elaborate
a bit in the wiki about its directory as well? Also could we consider
adding https://issues.apache.org/jira/browse/KAFKA-3522 along with this
feature since we may need to change the directory path / storage schema
formats for these different types of stores moving forward.



Guozhang


On Fri, Dec 9, 2016 at 4:21 AM, Damian Guy <da...@gmail.com> wrote:

> Thanks for the update Michael.
>
> I just wanted to add that there is one crucial piece of information that
> i've failed to add (I apologise).
>
> To me, the join between 2 Global Tables just produces a view on top of the
> underlying tables (this is the same as it works for KTables today). So that
> means there is no Physical StateStore that backs the join result, it is
> just a Virtual StateStore that knows how to resolve the join when it is
> required. I've deliberately taken this path so that we don't end up having
> yet another copy of the data, stored on local disk, and sent to another
> change-log topic. This also reduces the memory overhead from creating
> RocksDBStores and reduces load on the Thread based caches we have. So it is
> a resource optimization.
>
> So while it is technically possible to support outer joins, we would need
> to physically materialize the StateStore (and create a changelog-topic for
> it), or, we'd need to provide another interface where the user could map
> from the outerJoin key to both of the other table keys. This is because the
> key of the outerJoin table could be either the key of the lhs table, or the
> rhs tables, or something completely different.
>
> With this and what you have mentioned above in mind i think we should park
> outerJoin support for this KIP and re-visit if and when we need it in the
> future.
>
> I'll update the KIP with this.
>
> Thanks,
> Damian
>
> On Fri, 9 Dec 2016 at 09:53 Michael Noll <mi...@confluent.io> wrote:
>
> > Damian and I briefly chatted offline (thanks, Damian!), and here's the
> > summary of my thoughts and conclusion.
> >
> > TL;DR: Let's skip outer join support for global tables.
> >
> > In more detail:
> >
> > - We agreed that, technically, we can add OUTER JOIN support.  However,
> > outer joins only work if certain preconditions are met.  The
> preconditions
> > are IMHO simliar/the same as we have for the normal, partitioned KTables
> > (e.g. having matching keys and co-partitioned data for the tables), but
> in
> > the case of global tables the user would need to meet all these
> > preconditions in one big swing when specifying the params for the outer
> > join call.  Even so, you'd only know at run-time whether the
> preconditions
> > were actually met properly.
> >
> > - Hence it's quite likely that users will be confused about these
> > preconditions and how to meet them, and -- from what we can tell -- use
> > cases / user demand for outer joins have been rare.
> >
> > - So, long story short, even though we could add outer join support we'd
> > suggest to skip it for global tables.  If we subsequently learn that is a
> > lot of user interest in that functionality, we still have the option to
> add
> > it in the future.
> >
> >
> > Best,
> > Michael
> >
> >
> >
> >
> >
> >
> > On Thu, Dec 8, 2016 at 6:31 PM, Damian Guy <da...@gmail.com> wrote:
> >
> > > Hi Michael,
> > >
> > > I don't see how that helps?
> > >
> > > Lets say we have tables Person(id, device_id, name, ...), Device(id,
> > > person_id, type, ...), and both are keyed with same type. And we have a
> > > stream, that for the sake of simplicity, has both person_id and
> > device_id (
> > > i know this is a bit contrived!)
> > > so our join
> > > person = builder.globalTable(...);
> > > device = builder.globalTable(...);
> > > personDevice = builder.outerJoin(device, ...);
> > >
> > > someStream = builder.stream(..);
> > > // which id do i use to join with? person.id? device.id?
> > > someStream.leftJoin(personDevice, ...)
> > >
> > > // Interactive Query on the view generated by the join of person and
> > device
> > > personDeviceStore = streams.store("personDevice",...);
> > > // person.id? device.id?
> > > personDeviceStore.get(someId);
> > >
> > > We get records
> > > person id=1, device_id=2 ,...
> > > device id=2, person_id=1, ...
> > > stream person_id = 1, device_id = 2
> > >
> > > We could do the join between the GlobalTables both ways as each side
> > could
> > > map to the other sides key, but when i'm accessing the resulting table,
> > > personDevice, what is the key? The person.id ? the device.id? it can't
> > be
> > > both of them.
> > >
> > > Thanks,
> > > Damian
> > >
> > >
> > >
> > >
> > > On Thu, 8 Dec 2016 at 15:51 Michael Noll <mi...@confluent.io> wrote:
> > >
> > > > The key type returned by both KeyValueMappers (in the current trunk
> > > > version, that type is named `R`) would need to be the same for this
> to
> > > > work.
> > > >
> > > >
> > > > On Wed, Dec 7, 2016 at 4:46 PM, Damian Guy <da...@gmail.com>
> > wrote:
> > > >
> > > > > Michael,
> > > > >
> > > > > We can only support outerJoin if both tables are keyed the same
> way.
> > > Lets
> > > > > say for example you can map both ways, however, the key for each
> > table
> > > is
> > > > > of a different type. So t1 is long and t2 is string - what is the
> key
> > > > type
> > > > > of the resulting GlobalKTable? So when you subsequently join to
> this
> > > > table,
> > > > > and do a lookup on it, which key are you using?
> > > > >
> > > > > Thanks,
> > > > > Damian
> > > > >
> > > > > On Wed, 7 Dec 2016 at 14:31 Michael Noll <mi...@confluent.io>
> > wrote:
> > > > >
> > > > > > Damian,
> > > > > >
> > > > > > yes, that makes sense.
> > > > > >
> > > > > > But I am still wondering:  In your example, there's no prior
> > > knowledge
> > > > > "can
> > > > > > I map from t1->t2" that Streams can leverage for joining t1 and
> t2
> > > > other
> > > > > > than blindly relying on the user to provide an appropriate
> > > > KeyValueMapper
> > > > > > for K1/V1 of t1 -> K2/V2 of t2.  In other words, if we allow the
> > user
> > > > to
> > > > > > provide a KeyValueMapper from t1->t2 (Streams does not know at
> > > compile
> > > > > time
> > > > > > whether this mapping will actually work), then we can also allow
> > the
> > > > user
> > > > > > to provide a corresponding "reverse" mapper from t2->t1.  That
> is,
> > we
> > > > > could
> > > > > > say that an outer join between two global tables IS supported,
> but
> > if
> > > > and
> > > > > > only if the user provides two KeyValueMappers, one for t1->t2 and
> > one
> > > > for
> > > > > > t2->t1.
> > > > > >
> > > > > > The left join t1->t2 (which is supported in the KIP), in general,
> > > works
> > > > > > only because of the existence of the user-provided KeyValueMapper
> > > from
> > > > > > t1->t2.  The outer join, as you point out, cannot satisfied as
> > easily
> > > > > > because Streams must know two mappers, t1->t2 plus t2->t1 --
> > > otherwise
> > > > > the
> > > > > > outer join won't work.
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Wed, Dec 7, 2016 at 3:04 PM, Damian Guy <damian.guy@gmail.com
> >
> > > > wrote:
> > > > > >
> > > > > > > Hi Michael,
> > > > > > >
> > > > > > > Sure. Say we have 2 input topics t1 & t2 below:
> > > > > > > t1{
> > > > > > >  int key;
> > > > > > >  string t2_id;
> > > > > > >  ...
> > > > > > > }
> > > > > > >
> > > > > > > t2 {
> > > > > > >   string key;
> > > > > > >   ..
> > > > > > > }
> > > > > > > If we create global tables out of these we'd get:
> > > > > > > GlobalKTable<Integer, ?> t1;
> > > > > > > GlobalKTable<String, ?> t2;
> > > > > > >
> > > > > > > So the join can only go in 1 direction, i.e, from t1 -> t2 as
> in
> > > > order
> > > > > to
> > > > > > > perform the join we need to use a KeyValueMapper to extract the
> > t2
> > > > key
> > > > > > from
> > > > > > > the t1 value.
> > > > > > >
> > > > > > > Does that make sense?
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Damian
> > > > > > >
> > > > > > > On Wed, 7 Dec 2016 at 10:44 Michael Noll <michael@confluent.io
> >
> > > > wrote:
> > > > > > >
> > > > > > > > > There is no outer-join for GlobalKTables as the tables may
> be
> > > > keyed
> > > > > > > > > differently. So you need to use the key from the left side
> of
> > > the
> > > > > > join
> > > > > > > > > along with the KeyValueMapper to resolve the right side of
> > the
> > > > > join.
> > > > > > > This
> > > > > > > > > wont work the other way around.
> > > > > > > >
> > > > > > > > Care to elaborate why it won't work the other way around?
> If,
> > > for
> > > > > > > example,
> > > > > > > > we swapped the call from leftTable.join(rightTable) to
> > > > > > > > rightTable.join(leftTable), that join would work, too.
> > Perhaps I
> > > > am
> > > > > > > > missing something though. :-)
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > On Wed, Dec 7, 2016 at 10:39 AM, Damian Guy <
> > > damian.guy@gmail.com>
> > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi Matthias,
> > > > > > > > >
> > > > > > > > > Thanks for the feedback.
> > > > > > > > >
> > > > > > > > > There is no outer-join for GlobalKTables as the tables may
> be
> > > > keyed
> > > > > > > > > differently. So you need to use the key from the left side
> of
> > > the
> > > > > > join
> > > > > > > > > along with the KeyValueMapper to resolve the right side of
> > the
> > > > > join.
> > > > > > > This
> > > > > > > > > wont work the other way around.
> > > > > > > > >
> > > > > > > > > On the bootstrapping concern. If the application is failing
> > > > before
> > > > > > > > > bootstrapping finishes, the problem is likely to be related
> > to
> > > a
> > > > > > > terminal
> > > > > > > > > exception, i.e., running out of disk space, corrupt state
> > > stores
> > > > > etc.
> > > > > > > In
> > > > > > > > > these cases, we wouldn't want the application to continue.
> > So i
> > > > > think
> > > > > > > > this
> > > > > > > > > is ok.
> > > > > > > > >
> > > > > > > > > Thanks,
> > > > > > > > > Damian
> > > > > > > > >
> > > > > > > > > On Tue, 6 Dec 2016 at 21:56 Matthias J. Sax <
> > > > matthias@confluent.io
> > > > > >
> > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Thanks for the KIP Damian. Very nice motivating example!
> > > > > > > > > >
> > > > > > > > > > A few comments:
> > > > > > > > > >
> > > > > > > > > >  - why is there no outer-join for GlobalKTables
> > > > > > > > > >  - on bootstrapping GlobalKTable, could it happen that
> this
> > > > never
> > > > > > > > > > finishes if the application fails before bootstrapping
> > > finishes
> > > > > and
> > > > > > > new
> > > > > > > > > > data gets written at the same time? Do we need to guard
> > > against
> > > > > > this
> > > > > > > > > > (seems to be a very rare corner case, so maybe not
> > required)?
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > -Matthias
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > On 12/6/16 2:09 AM, Damian Guy wrote:
> > > > > > > > > > > Hi all,
> > > > > > > > > > >
> > > > > > > > > > > I would like to start the discussion on KIP-99:
> > > > > > > > > > >
> > > > > > > > > > https://cwiki.apache.org/confluence/pages/viewpage.
> > > > > > > > > action?pageId=67633649
> > > > > > > > > > >
> > > > > > > > > > > Looking forward to your feedback.
> > > > > > > > > > >
> > > > > > > > > > > Thanks,
> > > > > > > > > > > Damian
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > *Michael G. Noll*
> > > > > > Product Manager | Confluent
> > > > > > +1 650 453 5860 <(650)%20453-5860> <(650)%20453-5860>
> > <(650)%20453-5860> | @miguno <
> > > > https://twitter.com/miguno
> > > > > >
> > > > > > Follow us: Twitter <https://twitter.com/ConfluentInc> | Blog
> > > > > > <http://www.confluent.io/blog>
> > > > > >
> > > > >
> > > >
> > >
> >
>



-- 
-- Guozhang

Re: [DISCUSS] KIP-99: Add Global Tables to Kafka Streams

Posted by Damian Guy <da...@gmail.com>.
Thanks for the update Michael.

I just wanted to add that there is one crucial piece of information that
i've failed to add (I apologise).

To me, the join between 2 Global Tables just produces a view on top of the
underlying tables (this is the same as it works for KTables today). So that
means there is no Physical StateStore that backs the join result, it is
just a Virtual StateStore that knows how to resolve the join when it is
required. I've deliberately taken this path so that we don't end up having
yet another copy of the data, stored on local disk, and sent to another
change-log topic. This also reduces the memory overhead from creating
RocksDBStores and reduces load on the Thread based caches we have. So it is
a resource optimization.

So while it is technically possible to support outer joins, we would need
to physically materialize the StateStore (and create a changelog-topic for
it), or, we'd need to provide another interface where the user could map
from the outerJoin key to both of the other table keys. This is because the
key of the outerJoin table could be either the key of the lhs table, or the
rhs tables, or something completely different.

With this and what you have mentioned above in mind i think we should park
outerJoin support for this KIP and re-visit if and when we need it in the
future.

I'll update the KIP with this.

Thanks,
Damian

On Fri, 9 Dec 2016 at 09:53 Michael Noll <mi...@confluent.io> wrote:

> Damian and I briefly chatted offline (thanks, Damian!), and here's the
> summary of my thoughts and conclusion.
>
> TL;DR: Let's skip outer join support for global tables.
>
> In more detail:
>
> - We agreed that, technically, we can add OUTER JOIN support.  However,
> outer joins only work if certain preconditions are met.  The preconditions
> are IMHO simliar/the same as we have for the normal, partitioned KTables
> (e.g. having matching keys and co-partitioned data for the tables), but in
> the case of global tables the user would need to meet all these
> preconditions in one big swing when specifying the params for the outer
> join call.  Even so, you'd only know at run-time whether the preconditions
> were actually met properly.
>
> - Hence it's quite likely that users will be confused about these
> preconditions and how to meet them, and -- from what we can tell -- use
> cases / user demand for outer joins have been rare.
>
> - So, long story short, even though we could add outer join support we'd
> suggest to skip it for global tables.  If we subsequently learn that is a
> lot of user interest in that functionality, we still have the option to add
> it in the future.
>
>
> Best,
> Michael
>
>
>
>
>
>
> On Thu, Dec 8, 2016 at 6:31 PM, Damian Guy <da...@gmail.com> wrote:
>
> > Hi Michael,
> >
> > I don't see how that helps?
> >
> > Lets say we have tables Person(id, device_id, name, ...), Device(id,
> > person_id, type, ...), and both are keyed with same type. And we have a
> > stream, that for the sake of simplicity, has both person_id and
> device_id (
> > i know this is a bit contrived!)
> > so our join
> > person = builder.globalTable(...);
> > device = builder.globalTable(...);
> > personDevice = builder.outerJoin(device, ...);
> >
> > someStream = builder.stream(..);
> > // which id do i use to join with? person.id? device.id?
> > someStream.leftJoin(personDevice, ...)
> >
> > // Interactive Query on the view generated by the join of person and
> device
> > personDeviceStore = streams.store("personDevice",...);
> > // person.id? device.id?
> > personDeviceStore.get(someId);
> >
> > We get records
> > person id=1, device_id=2 ,...
> > device id=2, person_id=1, ...
> > stream person_id = 1, device_id = 2
> >
> > We could do the join between the GlobalTables both ways as each side
> could
> > map to the other sides key, but when i'm accessing the resulting table,
> > personDevice, what is the key? The person.id ? the device.id? it can't
> be
> > both of them.
> >
> > Thanks,
> > Damian
> >
> >
> >
> >
> > On Thu, 8 Dec 2016 at 15:51 Michael Noll <mi...@confluent.io> wrote:
> >
> > > The key type returned by both KeyValueMappers (in the current trunk
> > > version, that type is named `R`) would need to be the same for this to
> > > work.
> > >
> > >
> > > On Wed, Dec 7, 2016 at 4:46 PM, Damian Guy <da...@gmail.com>
> wrote:
> > >
> > > > Michael,
> > > >
> > > > We can only support outerJoin if both tables are keyed the same way.
> > Lets
> > > > say for example you can map both ways, however, the key for each
> table
> > is
> > > > of a different type. So t1 is long and t2 is string - what is the key
> > > type
> > > > of the resulting GlobalKTable? So when you subsequently join to this
> > > table,
> > > > and do a lookup on it, which key are you using?
> > > >
> > > > Thanks,
> > > > Damian
> > > >
> > > > On Wed, 7 Dec 2016 at 14:31 Michael Noll <mi...@confluent.io>
> wrote:
> > > >
> > > > > Damian,
> > > > >
> > > > > yes, that makes sense.
> > > > >
> > > > > But I am still wondering:  In your example, there's no prior
> > knowledge
> > > > "can
> > > > > I map from t1->t2" that Streams can leverage for joining t1 and t2
> > > other
> > > > > than blindly relying on the user to provide an appropriate
> > > KeyValueMapper
> > > > > for K1/V1 of t1 -> K2/V2 of t2.  In other words, if we allow the
> user
> > > to
> > > > > provide a KeyValueMapper from t1->t2 (Streams does not know at
> > compile
> > > > time
> > > > > whether this mapping will actually work), then we can also allow
> the
> > > user
> > > > > to provide a corresponding "reverse" mapper from t2->t1.  That is,
> we
> > > > could
> > > > > say that an outer join between two global tables IS supported, but
> if
> > > and
> > > > > only if the user provides two KeyValueMappers, one for t1->t2 and
> one
> > > for
> > > > > t2->t1.
> > > > >
> > > > > The left join t1->t2 (which is supported in the KIP), in general,
> > works
> > > > > only because of the existence of the user-provided KeyValueMapper
> > from
> > > > > t1->t2.  The outer join, as you point out, cannot satisfied as
> easily
> > > > > because Streams must know two mappers, t1->t2 plus t2->t1 --
> > otherwise
> > > > the
> > > > > outer join won't work.
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > On Wed, Dec 7, 2016 at 3:04 PM, Damian Guy <da...@gmail.com>
> > > wrote:
> > > > >
> > > > > > Hi Michael,
> > > > > >
> > > > > > Sure. Say we have 2 input topics t1 & t2 below:
> > > > > > t1{
> > > > > >  int key;
> > > > > >  string t2_id;
> > > > > >  ...
> > > > > > }
> > > > > >
> > > > > > t2 {
> > > > > >   string key;
> > > > > >   ..
> > > > > > }
> > > > > > If we create global tables out of these we'd get:
> > > > > > GlobalKTable<Integer, ?> t1;
> > > > > > GlobalKTable<String, ?> t2;
> > > > > >
> > > > > > So the join can only go in 1 direction, i.e, from t1 -> t2 as in
> > > order
> > > > to
> > > > > > perform the join we need to use a KeyValueMapper to extract the
> t2
> > > key
> > > > > from
> > > > > > the t1 value.
> > > > > >
> > > > > > Does that make sense?
> > > > > >
> > > > > > Thanks,
> > > > > > Damian
> > > > > >
> > > > > > On Wed, 7 Dec 2016 at 10:44 Michael Noll <mi...@confluent.io>
> > > wrote:
> > > > > >
> > > > > > > > There is no outer-join for GlobalKTables as the tables may be
> > > keyed
> > > > > > > > differently. So you need to use the key from the left side of
> > the
> > > > > join
> > > > > > > > along with the KeyValueMapper to resolve the right side of
> the
> > > > join.
> > > > > > This
> > > > > > > > wont work the other way around.
> > > > > > >
> > > > > > > Care to elaborate why it won't work the other way around?  If,
> > for
> > > > > > example,
> > > > > > > we swapped the call from leftTable.join(rightTable) to
> > > > > > > rightTable.join(leftTable), that join would work, too.
> Perhaps I
> > > am
> > > > > > > missing something though. :-)
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > On Wed, Dec 7, 2016 at 10:39 AM, Damian Guy <
> > damian.guy@gmail.com>
> > > > > > wrote:
> > > > > > >
> > > > > > > > Hi Matthias,
> > > > > > > >
> > > > > > > > Thanks for the feedback.
> > > > > > > >
> > > > > > > > There is no outer-join for GlobalKTables as the tables may be
> > > keyed
> > > > > > > > differently. So you need to use the key from the left side of
> > the
> > > > > join
> > > > > > > > along with the KeyValueMapper to resolve the right side of
> the
> > > > join.
> > > > > > This
> > > > > > > > wont work the other way around.
> > > > > > > >
> > > > > > > > On the bootstrapping concern. If the application is failing
> > > before
> > > > > > > > bootstrapping finishes, the problem is likely to be related
> to
> > a
> > > > > > terminal
> > > > > > > > exception, i.e., running out of disk space, corrupt state
> > stores
> > > > etc.
> > > > > > In
> > > > > > > > these cases, we wouldn't want the application to continue.
> So i
> > > > think
> > > > > > > this
> > > > > > > > is ok.
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > > Damian
> > > > > > > >
> > > > > > > > On Tue, 6 Dec 2016 at 21:56 Matthias J. Sax <
> > > matthias@confluent.io
> > > > >
> > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Thanks for the KIP Damian. Very nice motivating example!
> > > > > > > > >
> > > > > > > > > A few comments:
> > > > > > > > >
> > > > > > > > >  - why is there no outer-join for GlobalKTables
> > > > > > > > >  - on bootstrapping GlobalKTable, could it happen that this
> > > never
> > > > > > > > > finishes if the application fails before bootstrapping
> > finishes
> > > > and
> > > > > > new
> > > > > > > > > data gets written at the same time? Do we need to guard
> > against
> > > > > this
> > > > > > > > > (seems to be a very rare corner case, so maybe not
> required)?
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > -Matthias
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On 12/6/16 2:09 AM, Damian Guy wrote:
> > > > > > > > > > Hi all,
> > > > > > > > > >
> > > > > > > > > > I would like to start the discussion on KIP-99:
> > > > > > > > > >
> > > > > > > > > https://cwiki.apache.org/confluence/pages/viewpage.
> > > > > > > > action?pageId=67633649
> > > > > > > > > >
> > > > > > > > > > Looking forward to your feedback.
> > > > > > > > > >
> > > > > > > > > > Thanks,
> > > > > > > > > > Damian
> > > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > > *Michael G. Noll*
> > > > > Product Manager | Confluent
> > > > > +1 650 453 5860 <(650)%20453-5860> <(650)%20453-5860>
> <(650)%20453-5860> | @miguno <
> > > https://twitter.com/miguno
> > > > >
> > > > > Follow us: Twitter <https://twitter.com/ConfluentInc> | Blog
> > > > > <http://www.confluent.io/blog>
> > > > >
> > > >
> > >
> >
>

Re: [DISCUSS] KIP-99: Add Global Tables to Kafka Streams

Posted by Michael Noll <mi...@confluent.io>.
Damian and I briefly chatted offline (thanks, Damian!), and here's the
summary of my thoughts and conclusion.

TL;DR: Let's skip outer join support for global tables.

In more detail:

- We agreed that, technically, we can add OUTER JOIN support.  However,
outer joins only work if certain preconditions are met.  The preconditions
are IMHO simliar/the same as we have for the normal, partitioned KTables
(e.g. having matching keys and co-partitioned data for the tables), but in
the case of global tables the user would need to meet all these
preconditions in one big swing when specifying the params for the outer
join call.  Even so, you'd only know at run-time whether the preconditions
were actually met properly.

- Hence it's quite likely that users will be confused about these
preconditions and how to meet them, and -- from what we can tell -- use
cases / user demand for outer joins have been rare.

- So, long story short, even though we could add outer join support we'd
suggest to skip it for global tables.  If we subsequently learn that is a
lot of user interest in that functionality, we still have the option to add
it in the future.


Best,
Michael






On Thu, Dec 8, 2016 at 6:31 PM, Damian Guy <da...@gmail.com> wrote:

> Hi Michael,
>
> I don't see how that helps?
>
> Lets say we have tables Person(id, device_id, name, ...), Device(id,
> person_id, type, ...), and both are keyed with same type. And we have a
> stream, that for the sake of simplicity, has both person_id and device_id (
> i know this is a bit contrived!)
> so our join
> person = builder.globalTable(...);
> device = builder.globalTable(...);
> personDevice = builder.outerJoin(device, ...);
>
> someStream = builder.stream(..);
> // which id do i use to join with? person.id? device.id?
> someStream.leftJoin(personDevice, ...)
>
> // Interactive Query on the view generated by the join of person and device
> personDeviceStore = streams.store("personDevice",...);
> // person.id? device.id?
> personDeviceStore.get(someId);
>
> We get records
> person id=1, device_id=2 ,...
> device id=2, person_id=1, ...
> stream person_id = 1, device_id = 2
>
> We could do the join between the GlobalTables both ways as each side could
> map to the other sides key, but when i'm accessing the resulting table,
> personDevice, what is the key? The person.id ? the device.id? it can't be
> both of them.
>
> Thanks,
> Damian
>
>
>
>
> On Thu, 8 Dec 2016 at 15:51 Michael Noll <mi...@confluent.io> wrote:
>
> > The key type returned by both KeyValueMappers (in the current trunk
> > version, that type is named `R`) would need to be the same for this to
> > work.
> >
> >
> > On Wed, Dec 7, 2016 at 4:46 PM, Damian Guy <da...@gmail.com> wrote:
> >
> > > Michael,
> > >
> > > We can only support outerJoin if both tables are keyed the same way.
> Lets
> > > say for example you can map both ways, however, the key for each table
> is
> > > of a different type. So t1 is long and t2 is string - what is the key
> > type
> > > of the resulting GlobalKTable? So when you subsequently join to this
> > table,
> > > and do a lookup on it, which key are you using?
> > >
> > > Thanks,
> > > Damian
> > >
> > > On Wed, 7 Dec 2016 at 14:31 Michael Noll <mi...@confluent.io> wrote:
> > >
> > > > Damian,
> > > >
> > > > yes, that makes sense.
> > > >
> > > > But I am still wondering:  In your example, there's no prior
> knowledge
> > > "can
> > > > I map from t1->t2" that Streams can leverage for joining t1 and t2
> > other
> > > > than blindly relying on the user to provide an appropriate
> > KeyValueMapper
> > > > for K1/V1 of t1 -> K2/V2 of t2.  In other words, if we allow the user
> > to
> > > > provide a KeyValueMapper from t1->t2 (Streams does not know at
> compile
> > > time
> > > > whether this mapping will actually work), then we can also allow the
> > user
> > > > to provide a corresponding "reverse" mapper from t2->t1.  That is, we
> > > could
> > > > say that an outer join between two global tables IS supported, but if
> > and
> > > > only if the user provides two KeyValueMappers, one for t1->t2 and one
> > for
> > > > t2->t1.
> > > >
> > > > The left join t1->t2 (which is supported in the KIP), in general,
> works
> > > > only because of the existence of the user-provided KeyValueMapper
> from
> > > > t1->t2.  The outer join, as you point out, cannot satisfied as easily
> > > > because Streams must know two mappers, t1->t2 plus t2->t1 --
> otherwise
> > > the
> > > > outer join won't work.
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > On Wed, Dec 7, 2016 at 3:04 PM, Damian Guy <da...@gmail.com>
> > wrote:
> > > >
> > > > > Hi Michael,
> > > > >
> > > > > Sure. Say we have 2 input topics t1 & t2 below:
> > > > > t1{
> > > > >  int key;
> > > > >  string t2_id;
> > > > >  ...
> > > > > }
> > > > >
> > > > > t2 {
> > > > >   string key;
> > > > >   ..
> > > > > }
> > > > > If we create global tables out of these we'd get:
> > > > > GlobalKTable<Integer, ?> t1;
> > > > > GlobalKTable<String, ?> t2;
> > > > >
> > > > > So the join can only go in 1 direction, i.e, from t1 -> t2 as in
> > order
> > > to
> > > > > perform the join we need to use a KeyValueMapper to extract the t2
> > key
> > > > from
> > > > > the t1 value.
> > > > >
> > > > > Does that make sense?
> > > > >
> > > > > Thanks,
> > > > > Damian
> > > > >
> > > > > On Wed, 7 Dec 2016 at 10:44 Michael Noll <mi...@confluent.io>
> > wrote:
> > > > >
> > > > > > > There is no outer-join for GlobalKTables as the tables may be
> > keyed
> > > > > > > differently. So you need to use the key from the left side of
> the
> > > > join
> > > > > > > along with the KeyValueMapper to resolve the right side of the
> > > join.
> > > > > This
> > > > > > > wont work the other way around.
> > > > > >
> > > > > > Care to elaborate why it won't work the other way around?  If,
> for
> > > > > example,
> > > > > > we swapped the call from leftTable.join(rightTable) to
> > > > > > rightTable.join(leftTable), that join would work, too.  Perhaps I
> > am
> > > > > > missing something though. :-)
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Wed, Dec 7, 2016 at 10:39 AM, Damian Guy <
> damian.guy@gmail.com>
> > > > > wrote:
> > > > > >
> > > > > > > Hi Matthias,
> > > > > > >
> > > > > > > Thanks for the feedback.
> > > > > > >
> > > > > > > There is no outer-join for GlobalKTables as the tables may be
> > keyed
> > > > > > > differently. So you need to use the key from the left side of
> the
> > > > join
> > > > > > > along with the KeyValueMapper to resolve the right side of the
> > > join.
> > > > > This
> > > > > > > wont work the other way around.
> > > > > > >
> > > > > > > On the bootstrapping concern. If the application is failing
> > before
> > > > > > > bootstrapping finishes, the problem is likely to be related to
> a
> > > > > terminal
> > > > > > > exception, i.e., running out of disk space, corrupt state
> stores
> > > etc.
> > > > > In
> > > > > > > these cases, we wouldn't want the application to continue. So i
> > > think
> > > > > > this
> > > > > > > is ok.
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Damian
> > > > > > >
> > > > > > > On Tue, 6 Dec 2016 at 21:56 Matthias J. Sax <
> > matthias@confluent.io
> > > >
> > > > > > wrote:
> > > > > > >
> > > > > > > > Thanks for the KIP Damian. Very nice motivating example!
> > > > > > > >
> > > > > > > > A few comments:
> > > > > > > >
> > > > > > > >  - why is there no outer-join for GlobalKTables
> > > > > > > >  - on bootstrapping GlobalKTable, could it happen that this
> > never
> > > > > > > > finishes if the application fails before bootstrapping
> finishes
> > > and
> > > > > new
> > > > > > > > data gets written at the same time? Do we need to guard
> against
> > > > this
> > > > > > > > (seems to be a very rare corner case, so maybe not required)?
> > > > > > > >
> > > > > > > >
> > > > > > > > -Matthias
> > > > > > > >
> > > > > > > >
> > > > > > > > On 12/6/16 2:09 AM, Damian Guy wrote:
> > > > > > > > > Hi all,
> > > > > > > > >
> > > > > > > > > I would like to start the discussion on KIP-99:
> > > > > > > > >
> > > > > > > > https://cwiki.apache.org/confluence/pages/viewpage.
> > > > > > > action?pageId=67633649
> > > > > > > > >
> > > > > > > > > Looking forward to your feedback.
> > > > > > > > >
> > > > > > > > > Thanks,
> > > > > > > > > Damian
> > > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > *Michael G. Noll*
> > > > Product Manager | Confluent
> > > > +1 650 453 5860 <(650)%20453-5860> <(650)%20453-5860> | @miguno <
> > https://twitter.com/miguno
> > > >
> > > > Follow us: Twitter <https://twitter.com/ConfluentInc> | Blog
> > > > <http://www.confluent.io/blog>
> > > >
> > >
> >
>

Re: [DISCUSS] KIP-99: Add Global Tables to Kafka Streams

Posted by Damian Guy <da...@gmail.com>.
Hi Michael,

I don't see how that helps?

Lets say we have tables Person(id, device_id, name, ...), Device(id,
person_id, type, ...), and both are keyed with same type. And we have a
stream, that for the sake of simplicity, has both person_id and device_id (
i know this is a bit contrived!)
so our join
person = builder.globalTable(...);
device = builder.globalTable(...);
personDevice = builder.outerJoin(device, ...);

someStream = builder.stream(..);
// which id do i use to join with? person.id? device.id?
someStream.leftJoin(personDevice, ...)

// Interactive Query on the view generated by the join of person and device
personDeviceStore = streams.store("personDevice",...);
// person.id? device.id?
personDeviceStore.get(someId);

We get records
person id=1, device_id=2 ,...
device id=2, person_id=1, ...
stream person_id = 1, device_id = 2

We could do the join between the GlobalTables both ways as each side could
map to the other sides key, but when i'm accessing the resulting table,
personDevice, what is the key? The person.id ? the device.id? it can't be
both of them.

Thanks,
Damian




On Thu, 8 Dec 2016 at 15:51 Michael Noll <mi...@confluent.io> wrote:

> The key type returned by both KeyValueMappers (in the current trunk
> version, that type is named `R`) would need to be the same for this to
> work.
>
>
> On Wed, Dec 7, 2016 at 4:46 PM, Damian Guy <da...@gmail.com> wrote:
>
> > Michael,
> >
> > We can only support outerJoin if both tables are keyed the same way. Lets
> > say for example you can map both ways, however, the key for each table is
> > of a different type. So t1 is long and t2 is string - what is the key
> type
> > of the resulting GlobalKTable? So when you subsequently join to this
> table,
> > and do a lookup on it, which key are you using?
> >
> > Thanks,
> > Damian
> >
> > On Wed, 7 Dec 2016 at 14:31 Michael Noll <mi...@confluent.io> wrote:
> >
> > > Damian,
> > >
> > > yes, that makes sense.
> > >
> > > But I am still wondering:  In your example, there's no prior knowledge
> > "can
> > > I map from t1->t2" that Streams can leverage for joining t1 and t2
> other
> > > than blindly relying on the user to provide an appropriate
> KeyValueMapper
> > > for K1/V1 of t1 -> K2/V2 of t2.  In other words, if we allow the user
> to
> > > provide a KeyValueMapper from t1->t2 (Streams does not know at compile
> > time
> > > whether this mapping will actually work), then we can also allow the
> user
> > > to provide a corresponding "reverse" mapper from t2->t1.  That is, we
> > could
> > > say that an outer join between two global tables IS supported, but if
> and
> > > only if the user provides two KeyValueMappers, one for t1->t2 and one
> for
> > > t2->t1.
> > >
> > > The left join t1->t2 (which is supported in the KIP), in general, works
> > > only because of the existence of the user-provided KeyValueMapper from
> > > t1->t2.  The outer join, as you point out, cannot satisfied as easily
> > > because Streams must know two mappers, t1->t2 plus t2->t1 -- otherwise
> > the
> > > outer join won't work.
> > >
> > >
> > >
> > >
> > >
> > > On Wed, Dec 7, 2016 at 3:04 PM, Damian Guy <da...@gmail.com>
> wrote:
> > >
> > > > Hi Michael,
> > > >
> > > > Sure. Say we have 2 input topics t1 & t2 below:
> > > > t1{
> > > >  int key;
> > > >  string t2_id;
> > > >  ...
> > > > }
> > > >
> > > > t2 {
> > > >   string key;
> > > >   ..
> > > > }
> > > > If we create global tables out of these we'd get:
> > > > GlobalKTable<Integer, ?> t1;
> > > > GlobalKTable<String, ?> t2;
> > > >
> > > > So the join can only go in 1 direction, i.e, from t1 -> t2 as in
> order
> > to
> > > > perform the join we need to use a KeyValueMapper to extract the t2
> key
> > > from
> > > > the t1 value.
> > > >
> > > > Does that make sense?
> > > >
> > > > Thanks,
> > > > Damian
> > > >
> > > > On Wed, 7 Dec 2016 at 10:44 Michael Noll <mi...@confluent.io>
> wrote:
> > > >
> > > > > > There is no outer-join for GlobalKTables as the tables may be
> keyed
> > > > > > differently. So you need to use the key from the left side of the
> > > join
> > > > > > along with the KeyValueMapper to resolve the right side of the
> > join.
> > > > This
> > > > > > wont work the other way around.
> > > > >
> > > > > Care to elaborate why it won't work the other way around?  If, for
> > > > example,
> > > > > we swapped the call from leftTable.join(rightTable) to
> > > > > rightTable.join(leftTable), that join would work, too.  Perhaps I
> am
> > > > > missing something though. :-)
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > On Wed, Dec 7, 2016 at 10:39 AM, Damian Guy <da...@gmail.com>
> > > > wrote:
> > > > >
> > > > > > Hi Matthias,
> > > > > >
> > > > > > Thanks for the feedback.
> > > > > >
> > > > > > There is no outer-join for GlobalKTables as the tables may be
> keyed
> > > > > > differently. So you need to use the key from the left side of the
> > > join
> > > > > > along with the KeyValueMapper to resolve the right side of the
> > join.
> > > > This
> > > > > > wont work the other way around.
> > > > > >
> > > > > > On the bootstrapping concern. If the application is failing
> before
> > > > > > bootstrapping finishes, the problem is likely to be related to a
> > > > terminal
> > > > > > exception, i.e., running out of disk space, corrupt state stores
> > etc.
> > > > In
> > > > > > these cases, we wouldn't want the application to continue. So i
> > think
> > > > > this
> > > > > > is ok.
> > > > > >
> > > > > > Thanks,
> > > > > > Damian
> > > > > >
> > > > > > On Tue, 6 Dec 2016 at 21:56 Matthias J. Sax <
> matthias@confluent.io
> > >
> > > > > wrote:
> > > > > >
> > > > > > > Thanks for the KIP Damian. Very nice motivating example!
> > > > > > >
> > > > > > > A few comments:
> > > > > > >
> > > > > > >  - why is there no outer-join for GlobalKTables
> > > > > > >  - on bootstrapping GlobalKTable, could it happen that this
> never
> > > > > > > finishes if the application fails before bootstrapping finishes
> > and
> > > > new
> > > > > > > data gets written at the same time? Do we need to guard against
> > > this
> > > > > > > (seems to be a very rare corner case, so maybe not required)?
> > > > > > >
> > > > > > >
> > > > > > > -Matthias
> > > > > > >
> > > > > > >
> > > > > > > On 12/6/16 2:09 AM, Damian Guy wrote:
> > > > > > > > Hi all,
> > > > > > > >
> > > > > > > > I would like to start the discussion on KIP-99:
> > > > > > > >
> > > > > > > https://cwiki.apache.org/confluence/pages/viewpage.
> > > > > > action?pageId=67633649
> > > > > > > >
> > > > > > > > Looking forward to your feedback.
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > > Damian
> > > > > > > >
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > *Michael G. Noll*
> > > Product Manager | Confluent
> > > +1 650 453 5860 <(650)%20453-5860> <(650)%20453-5860> | @miguno <
> https://twitter.com/miguno
> > >
> > > Follow us: Twitter <https://twitter.com/ConfluentInc> | Blog
> > > <http://www.confluent.io/blog>
> > >
> >
>

Re: [DISCUSS] KIP-99: Add Global Tables to Kafka Streams

Posted by Michael Noll <mi...@confluent.io>.
The key type returned by both KeyValueMappers (in the current trunk
version, that type is named `R`) would need to be the same for this to work.


On Wed, Dec 7, 2016 at 4:46 PM, Damian Guy <da...@gmail.com> wrote:

> Michael,
>
> We can only support outerJoin if both tables are keyed the same way. Lets
> say for example you can map both ways, however, the key for each table is
> of a different type. So t1 is long and t2 is string - what is the key type
> of the resulting GlobalKTable? So when you subsequently join to this table,
> and do a lookup on it, which key are you using?
>
> Thanks,
> Damian
>
> On Wed, 7 Dec 2016 at 14:31 Michael Noll <mi...@confluent.io> wrote:
>
> > Damian,
> >
> > yes, that makes sense.
> >
> > But I am still wondering:  In your example, there's no prior knowledge
> "can
> > I map from t1->t2" that Streams can leverage for joining t1 and t2 other
> > than blindly relying on the user to provide an appropriate KeyValueMapper
> > for K1/V1 of t1 -> K2/V2 of t2.  In other words, if we allow the user to
> > provide a KeyValueMapper from t1->t2 (Streams does not know at compile
> time
> > whether this mapping will actually work), then we can also allow the user
> > to provide a corresponding "reverse" mapper from t2->t1.  That is, we
> could
> > say that an outer join between two global tables IS supported, but if and
> > only if the user provides two KeyValueMappers, one for t1->t2 and one for
> > t2->t1.
> >
> > The left join t1->t2 (which is supported in the KIP), in general, works
> > only because of the existence of the user-provided KeyValueMapper from
> > t1->t2.  The outer join, as you point out, cannot satisfied as easily
> > because Streams must know two mappers, t1->t2 plus t2->t1 -- otherwise
> the
> > outer join won't work.
> >
> >
> >
> >
> >
> > On Wed, Dec 7, 2016 at 3:04 PM, Damian Guy <da...@gmail.com> wrote:
> >
> > > Hi Michael,
> > >
> > > Sure. Say we have 2 input topics t1 & t2 below:
> > > t1{
> > >  int key;
> > >  string t2_id;
> > >  ...
> > > }
> > >
> > > t2 {
> > >   string key;
> > >   ..
> > > }
> > > If we create global tables out of these we'd get:
> > > GlobalKTable<Integer, ?> t1;
> > > GlobalKTable<String, ?> t2;
> > >
> > > So the join can only go in 1 direction, i.e, from t1 -> t2 as in order
> to
> > > perform the join we need to use a KeyValueMapper to extract the t2 key
> > from
> > > the t1 value.
> > >
> > > Does that make sense?
> > >
> > > Thanks,
> > > Damian
> > >
> > > On Wed, 7 Dec 2016 at 10:44 Michael Noll <mi...@confluent.io> wrote:
> > >
> > > > > There is no outer-join for GlobalKTables as the tables may be keyed
> > > > > differently. So you need to use the key from the left side of the
> > join
> > > > > along with the KeyValueMapper to resolve the right side of the
> join.
> > > This
> > > > > wont work the other way around.
> > > >
> > > > Care to elaborate why it won't work the other way around?  If, for
> > > example,
> > > > we swapped the call from leftTable.join(rightTable) to
> > > > rightTable.join(leftTable), that join would work, too.  Perhaps I am
> > > > missing something though. :-)
> > > >
> > > >
> > > >
> > > >
> > > > On Wed, Dec 7, 2016 at 10:39 AM, Damian Guy <da...@gmail.com>
> > > wrote:
> > > >
> > > > > Hi Matthias,
> > > > >
> > > > > Thanks for the feedback.
> > > > >
> > > > > There is no outer-join for GlobalKTables as the tables may be keyed
> > > > > differently. So you need to use the key from the left side of the
> > join
> > > > > along with the KeyValueMapper to resolve the right side of the
> join.
> > > This
> > > > > wont work the other way around.
> > > > >
> > > > > On the bootstrapping concern. If the application is failing before
> > > > > bootstrapping finishes, the problem is likely to be related to a
> > > terminal
> > > > > exception, i.e., running out of disk space, corrupt state stores
> etc.
> > > In
> > > > > these cases, we wouldn't want the application to continue. So i
> think
> > > > this
> > > > > is ok.
> > > > >
> > > > > Thanks,
> > > > > Damian
> > > > >
> > > > > On Tue, 6 Dec 2016 at 21:56 Matthias J. Sax <matthias@confluent.io
> >
> > > > wrote:
> > > > >
> > > > > > Thanks for the KIP Damian. Very nice motivating example!
> > > > > >
> > > > > > A few comments:
> > > > > >
> > > > > >  - why is there no outer-join for GlobalKTables
> > > > > >  - on bootstrapping GlobalKTable, could it happen that this never
> > > > > > finishes if the application fails before bootstrapping finishes
> and
> > > new
> > > > > > data gets written at the same time? Do we need to guard against
> > this
> > > > > > (seems to be a very rare corner case, so maybe not required)?
> > > > > >
> > > > > >
> > > > > > -Matthias
> > > > > >
> > > > > >
> > > > > > On 12/6/16 2:09 AM, Damian Guy wrote:
> > > > > > > Hi all,
> > > > > > >
> > > > > > > I would like to start the discussion on KIP-99:
> > > > > > >
> > > > > > https://cwiki.apache.org/confluence/pages/viewpage.
> > > > > action?pageId=67633649
> > > > > > >
> > > > > > > Looking forward to your feedback.
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Damian
> > > > > > >
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> >
> >
> > --
> > *Michael G. Noll*
> > Product Manager | Confluent
> > +1 650 453 5860 <(650)%20453-5860> | @miguno <https://twitter.com/miguno
> >
> > Follow us: Twitter <https://twitter.com/ConfluentInc> | Blog
> > <http://www.confluent.io/blog>
> >
>

Re: [DISCUSS] KIP-99: Add Global Tables to Kafka Streams

Posted by Damian Guy <da...@gmail.com>.
Michael,

We can only support outerJoin if both tables are keyed the same way. Lets
say for example you can map both ways, however, the key for each table is
of a different type. So t1 is long and t2 is string - what is the key type
of the resulting GlobalKTable? So when you subsequently join to this table,
and do a lookup on it, which key are you using?

Thanks,
Damian

On Wed, 7 Dec 2016 at 14:31 Michael Noll <mi...@confluent.io> wrote:

> Damian,
>
> yes, that makes sense.
>
> But I am still wondering:  In your example, there's no prior knowledge "can
> I map from t1->t2" that Streams can leverage for joining t1 and t2 other
> than blindly relying on the user to provide an appropriate KeyValueMapper
> for K1/V1 of t1 -> K2/V2 of t2.  In other words, if we allow the user to
> provide a KeyValueMapper from t1->t2 (Streams does not know at compile time
> whether this mapping will actually work), then we can also allow the user
> to provide a corresponding "reverse" mapper from t2->t1.  That is, we could
> say that an outer join between two global tables IS supported, but if and
> only if the user provides two KeyValueMappers, one for t1->t2 and one for
> t2->t1.
>
> The left join t1->t2 (which is supported in the KIP), in general, works
> only because of the existence of the user-provided KeyValueMapper from
> t1->t2.  The outer join, as you point out, cannot satisfied as easily
> because Streams must know two mappers, t1->t2 plus t2->t1 -- otherwise the
> outer join won't work.
>
>
>
>
>
> On Wed, Dec 7, 2016 at 3:04 PM, Damian Guy <da...@gmail.com> wrote:
>
> > Hi Michael,
> >
> > Sure. Say we have 2 input topics t1 & t2 below:
> > t1{
> >  int key;
> >  string t2_id;
> >  ...
> > }
> >
> > t2 {
> >   string key;
> >   ..
> > }
> > If we create global tables out of these we'd get:
> > GlobalKTable<Integer, ?> t1;
> > GlobalKTable<String, ?> t2;
> >
> > So the join can only go in 1 direction, i.e, from t1 -> t2 as in order to
> > perform the join we need to use a KeyValueMapper to extract the t2 key
> from
> > the t1 value.
> >
> > Does that make sense?
> >
> > Thanks,
> > Damian
> >
> > On Wed, 7 Dec 2016 at 10:44 Michael Noll <mi...@confluent.io> wrote:
> >
> > > > There is no outer-join for GlobalKTables as the tables may be keyed
> > > > differently. So you need to use the key from the left side of the
> join
> > > > along with the KeyValueMapper to resolve the right side of the join.
> > This
> > > > wont work the other way around.
> > >
> > > Care to elaborate why it won't work the other way around?  If, for
> > example,
> > > we swapped the call from leftTable.join(rightTable) to
> > > rightTable.join(leftTable), that join would work, too.  Perhaps I am
> > > missing something though. :-)
> > >
> > >
> > >
> > >
> > > On Wed, Dec 7, 2016 at 10:39 AM, Damian Guy <da...@gmail.com>
> > wrote:
> > >
> > > > Hi Matthias,
> > > >
> > > > Thanks for the feedback.
> > > >
> > > > There is no outer-join for GlobalKTables as the tables may be keyed
> > > > differently. So you need to use the key from the left side of the
> join
> > > > along with the KeyValueMapper to resolve the right side of the join.
> > This
> > > > wont work the other way around.
> > > >
> > > > On the bootstrapping concern. If the application is failing before
> > > > bootstrapping finishes, the problem is likely to be related to a
> > terminal
> > > > exception, i.e., running out of disk space, corrupt state stores etc.
> > In
> > > > these cases, we wouldn't want the application to continue. So i think
> > > this
> > > > is ok.
> > > >
> > > > Thanks,
> > > > Damian
> > > >
> > > > On Tue, 6 Dec 2016 at 21:56 Matthias J. Sax <ma...@confluent.io>
> > > wrote:
> > > >
> > > > > Thanks for the KIP Damian. Very nice motivating example!
> > > > >
> > > > > A few comments:
> > > > >
> > > > >  - why is there no outer-join for GlobalKTables
> > > > >  - on bootstrapping GlobalKTable, could it happen that this never
> > > > > finishes if the application fails before bootstrapping finishes and
> > new
> > > > > data gets written at the same time? Do we need to guard against
> this
> > > > > (seems to be a very rare corner case, so maybe not required)?
> > > > >
> > > > >
> > > > > -Matthias
> > > > >
> > > > >
> > > > > On 12/6/16 2:09 AM, Damian Guy wrote:
> > > > > > Hi all,
> > > > > >
> > > > > > I would like to start the discussion on KIP-99:
> > > > > >
> > > > > https://cwiki.apache.org/confluence/pages/viewpage.
> > > > action?pageId=67633649
> > > > > >
> > > > > > Looking forward to your feedback.
> > > > > >
> > > > > > Thanks,
> > > > > > Damian
> > > > > >
> > > > >
> > > > >
> > > >
> > >
> >
>
>
>
> --
> *Michael G. Noll*
> Product Manager | Confluent
> +1 650 453 5860 <(650)%20453-5860> | @miguno <https://twitter.com/miguno>
> Follow us: Twitter <https://twitter.com/ConfluentInc> | Blog
> <http://www.confluent.io/blog>
>

Re: [DISCUSS] KIP-99: Add Global Tables to Kafka Streams

Posted by Michael Noll <mi...@confluent.io>.
Damian,

yes, that makes sense.

But I am still wondering:  In your example, there's no prior knowledge "can
I map from t1->t2" that Streams can leverage for joining t1 and t2 other
than blindly relying on the user to provide an appropriate KeyValueMapper
for K1/V1 of t1 -> K2/V2 of t2.  In other words, if we allow the user to
provide a KeyValueMapper from t1->t2 (Streams does not know at compile time
whether this mapping will actually work), then we can also allow the user
to provide a corresponding "reverse" mapper from t2->t1.  That is, we could
say that an outer join between two global tables IS supported, but if and
only if the user provides two KeyValueMappers, one for t1->t2 and one for
t2->t1.

The left join t1->t2 (which is supported in the KIP), in general, works
only because of the existence of the user-provided KeyValueMapper from
t1->t2.  The outer join, as you point out, cannot satisfied as easily
because Streams must know two mappers, t1->t2 plus t2->t1 -- otherwise the
outer join won't work.





On Wed, Dec 7, 2016 at 3:04 PM, Damian Guy <da...@gmail.com> wrote:

> Hi Michael,
>
> Sure. Say we have 2 input topics t1 & t2 below:
> t1{
>  int key;
>  string t2_id;
>  ...
> }
>
> t2 {
>   string key;
>   ..
> }
> If we create global tables out of these we'd get:
> GlobalKTable<Integer, ?> t1;
> GlobalKTable<String, ?> t2;
>
> So the join can only go in 1 direction, i.e, from t1 -> t2 as in order to
> perform the join we need to use a KeyValueMapper to extract the t2 key from
> the t1 value.
>
> Does that make sense?
>
> Thanks,
> Damian
>
> On Wed, 7 Dec 2016 at 10:44 Michael Noll <mi...@confluent.io> wrote:
>
> > > There is no outer-join for GlobalKTables as the tables may be keyed
> > > differently. So you need to use the key from the left side of the join
> > > along with the KeyValueMapper to resolve the right side of the join.
> This
> > > wont work the other way around.
> >
> > Care to elaborate why it won't work the other way around?  If, for
> example,
> > we swapped the call from leftTable.join(rightTable) to
> > rightTable.join(leftTable), that join would work, too.  Perhaps I am
> > missing something though. :-)
> >
> >
> >
> >
> > On Wed, Dec 7, 2016 at 10:39 AM, Damian Guy <da...@gmail.com>
> wrote:
> >
> > > Hi Matthias,
> > >
> > > Thanks for the feedback.
> > >
> > > There is no outer-join for GlobalKTables as the tables may be keyed
> > > differently. So you need to use the key from the left side of the join
> > > along with the KeyValueMapper to resolve the right side of the join.
> This
> > > wont work the other way around.
> > >
> > > On the bootstrapping concern. If the application is failing before
> > > bootstrapping finishes, the problem is likely to be related to a
> terminal
> > > exception, i.e., running out of disk space, corrupt state stores etc.
> In
> > > these cases, we wouldn't want the application to continue. So i think
> > this
> > > is ok.
> > >
> > > Thanks,
> > > Damian
> > >
> > > On Tue, 6 Dec 2016 at 21:56 Matthias J. Sax <ma...@confluent.io>
> > wrote:
> > >
> > > > Thanks for the KIP Damian. Very nice motivating example!
> > > >
> > > > A few comments:
> > > >
> > > >  - why is there no outer-join for GlobalKTables
> > > >  - on bootstrapping GlobalKTable, could it happen that this never
> > > > finishes if the application fails before bootstrapping finishes and
> new
> > > > data gets written at the same time? Do we need to guard against this
> > > > (seems to be a very rare corner case, so maybe not required)?
> > > >
> > > >
> > > > -Matthias
> > > >
> > > >
> > > > On 12/6/16 2:09 AM, Damian Guy wrote:
> > > > > Hi all,
> > > > >
> > > > > I would like to start the discussion on KIP-99:
> > > > >
> > > > https://cwiki.apache.org/confluence/pages/viewpage.
> > > action?pageId=67633649
> > > > >
> > > > > Looking forward to your feedback.
> > > > >
> > > > > Thanks,
> > > > > Damian
> > > > >
> > > >
> > > >
> > >
> >
>



-- 
*Michael G. Noll*
Product Manager | Confluent
+1 650 453 5860 | @miguno <https://twitter.com/miguno>
Follow us: Twitter <https://twitter.com/ConfluentInc> | Blog
<http://www.confluent.io/blog>

Re: [DISCUSS] KIP-99: Add Global Tables to Kafka Streams

Posted by Damian Guy <da...@gmail.com>.
Hi Michael,

Sure. Say we have 2 input topics t1 & t2 below:
t1{
 int key;
 string t2_id;
 ...
}

t2 {
  string key;
  ..
}
If we create global tables out of these we'd get:
GlobalKTable<Integer, ?> t1;
GlobalKTable<String, ?> t2;

So the join can only go in 1 direction, i.e, from t1 -> t2 as in order to
perform the join we need to use a KeyValueMapper to extract the t2 key from
the t1 value.

Does that make sense?

Thanks,
Damian

On Wed, 7 Dec 2016 at 10:44 Michael Noll <mi...@confluent.io> wrote:

> > There is no outer-join for GlobalKTables as the tables may be keyed
> > differently. So you need to use the key from the left side of the join
> > along with the KeyValueMapper to resolve the right side of the join. This
> > wont work the other way around.
>
> Care to elaborate why it won't work the other way around?  If, for example,
> we swapped the call from leftTable.join(rightTable) to
> rightTable.join(leftTable), that join would work, too.  Perhaps I am
> missing something though. :-)
>
>
>
>
> On Wed, Dec 7, 2016 at 10:39 AM, Damian Guy <da...@gmail.com> wrote:
>
> > Hi Matthias,
> >
> > Thanks for the feedback.
> >
> > There is no outer-join for GlobalKTables as the tables may be keyed
> > differently. So you need to use the key from the left side of the join
> > along with the KeyValueMapper to resolve the right side of the join. This
> > wont work the other way around.
> >
> > On the bootstrapping concern. If the application is failing before
> > bootstrapping finishes, the problem is likely to be related to a terminal
> > exception, i.e., running out of disk space, corrupt state stores etc. In
> > these cases, we wouldn't want the application to continue. So i think
> this
> > is ok.
> >
> > Thanks,
> > Damian
> >
> > On Tue, 6 Dec 2016 at 21:56 Matthias J. Sax <ma...@confluent.io>
> wrote:
> >
> > > Thanks for the KIP Damian. Very nice motivating example!
> > >
> > > A few comments:
> > >
> > >  - why is there no outer-join for GlobalKTables
> > >  - on bootstrapping GlobalKTable, could it happen that this never
> > > finishes if the application fails before bootstrapping finishes and new
> > > data gets written at the same time? Do we need to guard against this
> > > (seems to be a very rare corner case, so maybe not required)?
> > >
> > >
> > > -Matthias
> > >
> > >
> > > On 12/6/16 2:09 AM, Damian Guy wrote:
> > > > Hi all,
> > > >
> > > > I would like to start the discussion on KIP-99:
> > > >
> > > https://cwiki.apache.org/confluence/pages/viewpage.
> > action?pageId=67633649
> > > >
> > > > Looking forward to your feedback.
> > > >
> > > > Thanks,
> > > > Damian
> > > >
> > >
> > >
> >
>

Re: [DISCUSS] KIP-99: Add Global Tables to Kafka Streams

Posted by Michael Noll <mi...@confluent.io>.
> There is no outer-join for GlobalKTables as the tables may be keyed
> differently. So you need to use the key from the left side of the join
> along with the KeyValueMapper to resolve the right side of the join. This
> wont work the other way around.

Care to elaborate why it won't work the other way around?  If, for example,
we swapped the call from leftTable.join(rightTable) to
rightTable.join(leftTable), that join would work, too.  Perhaps I am
missing something though. :-)




On Wed, Dec 7, 2016 at 10:39 AM, Damian Guy <da...@gmail.com> wrote:

> Hi Matthias,
>
> Thanks for the feedback.
>
> There is no outer-join for GlobalKTables as the tables may be keyed
> differently. So you need to use the key from the left side of the join
> along with the KeyValueMapper to resolve the right side of the join. This
> wont work the other way around.
>
> On the bootstrapping concern. If the application is failing before
> bootstrapping finishes, the problem is likely to be related to a terminal
> exception, i.e., running out of disk space, corrupt state stores etc. In
> these cases, we wouldn't want the application to continue. So i think this
> is ok.
>
> Thanks,
> Damian
>
> On Tue, 6 Dec 2016 at 21:56 Matthias J. Sax <ma...@confluent.io> wrote:
>
> > Thanks for the KIP Damian. Very nice motivating example!
> >
> > A few comments:
> >
> >  - why is there no outer-join for GlobalKTables
> >  - on bootstrapping GlobalKTable, could it happen that this never
> > finishes if the application fails before bootstrapping finishes and new
> > data gets written at the same time? Do we need to guard against this
> > (seems to be a very rare corner case, so maybe not required)?
> >
> >
> > -Matthias
> >
> >
> > On 12/6/16 2:09 AM, Damian Guy wrote:
> > > Hi all,
> > >
> > > I would like to start the discussion on KIP-99:
> > >
> > https://cwiki.apache.org/confluence/pages/viewpage.
> action?pageId=67633649
> > >
> > > Looking forward to your feedback.
> > >
> > > Thanks,
> > > Damian
> > >
> >
> >
>

Re: [DISCUSS] KIP-99: Add Global Tables to Kafka Streams

Posted by Damian Guy <da...@gmail.com>.
Hi Matthias,

Thanks for the feedback.

There is no outer-join for GlobalKTables as the tables may be keyed
differently. So you need to use the key from the left side of the join
along with the KeyValueMapper to resolve the right side of the join. This
wont work the other way around.

On the bootstrapping concern. If the application is failing before
bootstrapping finishes, the problem is likely to be related to a terminal
exception, i.e., running out of disk space, corrupt state stores etc. In
these cases, we wouldn't want the application to continue. So i think this
is ok.

Thanks,
Damian

On Tue, 6 Dec 2016 at 21:56 Matthias J. Sax <ma...@confluent.io> wrote:

> Thanks for the KIP Damian. Very nice motivating example!
>
> A few comments:
>
>  - why is there no outer-join for GlobalKTables
>  - on bootstrapping GlobalKTable, could it happen that this never
> finishes if the application fails before bootstrapping finishes and new
> data gets written at the same time? Do we need to guard against this
> (seems to be a very rare corner case, so maybe not required)?
>
>
> -Matthias
>
>
> On 12/6/16 2:09 AM, Damian Guy wrote:
> > Hi all,
> >
> > I would like to start the discussion on KIP-99:
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=67633649
> >
> > Looking forward to your feedback.
> >
> > Thanks,
> > Damian
> >
>
>

Re: [DISCUSS] KIP-99: Add Global Tables to Kafka Streams

Posted by "Matthias J. Sax" <ma...@confluent.io>.
Thanks for the KIP Damian. Very nice motivating example!

A few comments:

 - why is there no outer-join for GlobalKTables
 - on bootstrapping GlobalKTable, could it happen that this never
finishes if the application fails before bootstrapping finishes and new
data gets written at the same time? Do we need to guard against this
(seems to be a very rare corner case, so maybe not required)?


-Matthias


On 12/6/16 2:09 AM, Damian Guy wrote:
> Hi all,
> 
> I would like to start the discussion on KIP-99:
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=67633649
> 
> Looking forward to your feedback.
> 
> Thanks,
> Damian
>