You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by karan alang <ka...@gmail.com> on 2017/10/22 19:30:16 UTC

left join between PageViews(KStream) & UserProfile (KTable)

Hello all -
I'm trying to run the sample PageView Kafka streams example,
(ref -
https://github.com/apache/kafka/tree/0.11.0/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview
)
and have a question ..

There is a leftJoin between PageView (Kstream) and UserProfile(Ktable) as
shown below... The join should give - PageViewByRegions

*What key should the join on ? *

*There seems to be no common key (eg. user) between the 2 classes -
PageView and UserProfile*

*Also, what should a sample input data in the PageView & UserProfile
specific topics be ?*
Do we need to add surrogate (id) key to the input for these, to enable the
left join ?

Code :

*------------- static Classes ------------ *

>
> static public class PageView {
>         public String user;
>         public String page;
>         public Long timestamp;
>     }
>


>     static public class UserProfile {
>         public String region;
>         public Long timestamp;
>     }



*----------Join between the views( i.e. KStream - PageView) & users (KTable
- UserProfile) *


KStream<String, PageViewByRegion> kstream1 =
> *views        .leftJoin(users, *
>         //PageView - first value type
>         //UserProfile - 2nd value type
>         //PageViewByRegion - Joined Value
>         new ValueJoiner<PageView, UserProfile, PageViewByRegion>() {
>             @Override
>             public PageViewByRegion apply(PageView view, UserProfile
> profile) {
>                 PageViewByRegion viewByRegion = new PageViewByRegion();
>                 viewByRegion.user = view.user;
>                 viewByRegion.page = view.page;
>                 System.out.println(" viewByRegion.user " +
> viewByRegion.user);
>                 System.out.println(" viewByRegion.page " +
> viewByRegion.page);
>
>                 if (profile != null) {
>                     viewByRegion.region = profile.region;
>                 } else {
>                     viewByRegion.region = "UNKNOWN";
>                 }
>
>                 System.out.println(" viewByRegion.page " +
> viewByRegion.region);
>
>                 return viewByRegion;
>             }
>         }
>         )

Re: left join between PageViews(KStream) & UserProfile (KTable)

Posted by karan alang <ka...@gmail.com>.
Thanks for clarifying that  !


On Mon, Oct 23, 2017 at 3:31 AM, Michael Noll <mi...@confluent.io> wrote:

> > *What key should the join on ? *
>
> The message key, on both cases, should contain the user ID in String
> format.
>
> > *There seems to be no common key (eg. user) between the 2 classes -
> PageView
> and UserProfile*
>
> The user ID is the common key, but the user ID is stored in the respective
> message *keys*, whereas PageView and UserProfile values are stored in the
> message *values* in Kafka.
>
> Btw, you might also want to take a look at
> https://github.com/confluentinc/kafka-streams-examples, which has many
> more
> examples, including a PageView demo called `PageViewRegionLambdaExample`
> that is similar to the one you shared above.  `PageViewRegionLambdaExample`
> ships with a data generator for the example, and instructions for how to
> run it.
>
> It would be great if we had the same setup for the Apache Kafka page view
> example, of course.  Pull requests are very welcome. :-)
>
> Hope this helps,
> Michael
>
>
>
>
>
>
> On Sun, Oct 22, 2017 at 9:30 PM, karan alang <ka...@gmail.com>
> wrote:
>
> > Hello all -
> > I'm trying to run the sample PageView Kafka streams example,
> > (ref -
> > https://github.com/apache/kafka/tree/0.11.0/streams/
> > examples/src/main/java/org/apache/kafka/streams/examples/pageview
> > )
> > and have a question ..
> >
> > There is a leftJoin between PageView (Kstream) and UserProfile(Ktable) as
> > shown below... The join should give - PageViewByRegions
> >
> > *What key should the join on ? *
> >
> > *There seems to be no common key (eg. user) between the 2 classes -
> > PageView and UserProfile*
> >
> > *Also, what should a sample input data in the PageView & UserProfile
> > specific topics be ?*
> > Do we need to add surrogate (id) key to the input for these, to enable
> the
> > left join ?
> >
> > Code :
> >
> > *------------- static Classes ------------ *
> >
> > >
> > > static public class PageView {
> > >         public String user;
> > >         public String page;
> > >         public Long timestamp;
> > >     }
> > >
> >
> >
> > >     static public class UserProfile {
> > >         public String region;
> > >         public Long timestamp;
> > >     }
> >
> >
> >
> > *----------Join between the views( i.e. KStream - PageView) & users
> (KTable
> > - UserProfile) *
> >
> >
> > KStream<String, PageViewByRegion> kstream1 =
> > > *views        .leftJoin(users, *
> > >         //PageView - first value type
> > >         //UserProfile - 2nd value type
> > >         //PageViewByRegion - Joined Value
> > >         new ValueJoiner<PageView, UserProfile, PageViewByRegion>() {
> > >             @Override
> > >             public PageViewByRegion apply(PageView view, UserProfile
> > > profile) {
> > >                 PageViewByRegion viewByRegion = new PageViewByRegion();
> > >                 viewByRegion.user = view.user;
> > >                 viewByRegion.page = view.page;
> > >                 System.out.println(" viewByRegion.user " +
> > > viewByRegion.user);
> > >                 System.out.println(" viewByRegion.page " +
> > > viewByRegion.page);
> > >
> > >                 if (profile != null) {
> > >                     viewByRegion.region = profile.region;
> > >                 } else {
> > >                     viewByRegion.region = "UNKNOWN";
> > >                 }
> > >
> > >                 System.out.println(" viewByRegion.page " +
> > > viewByRegion.region);
> > >
> > >                 return viewByRegion;
> > >             }
> > >         }
> > >         )
> >
>

Re: left join between PageViews(KStream) & UserProfile (KTable)

Posted by Michael Noll <mi...@confluent.io>.
> *What key should the join on ? *

The message key, on both cases, should contain the user ID in String format.

> *There seems to be no common key (eg. user) between the 2 classes - PageView
and UserProfile*

The user ID is the common key, but the user ID is stored in the respective
message *keys*, whereas PageView and UserProfile values are stored in the
message *values* in Kafka.

Btw, you might also want to take a look at
https://github.com/confluentinc/kafka-streams-examples, which has many more
examples, including a PageView demo called `PageViewRegionLambdaExample`
that is similar to the one you shared above.  `PageViewRegionLambdaExample`
ships with a data generator for the example, and instructions for how to
run it.

It would be great if we had the same setup for the Apache Kafka page view
example, of course.  Pull requests are very welcome. :-)

Hope this helps,
Michael






On Sun, Oct 22, 2017 at 9:30 PM, karan alang <ka...@gmail.com> wrote:

> Hello all -
> I'm trying to run the sample PageView Kafka streams example,
> (ref -
> https://github.com/apache/kafka/tree/0.11.0/streams/
> examples/src/main/java/org/apache/kafka/streams/examples/pageview
> )
> and have a question ..
>
> There is a leftJoin between PageView (Kstream) and UserProfile(Ktable) as
> shown below... The join should give - PageViewByRegions
>
> *What key should the join on ? *
>
> *There seems to be no common key (eg. user) between the 2 classes -
> PageView and UserProfile*
>
> *Also, what should a sample input data in the PageView & UserProfile
> specific topics be ?*
> Do we need to add surrogate (id) key to the input for these, to enable the
> left join ?
>
> Code :
>
> *------------- static Classes ------------ *
>
> >
> > static public class PageView {
> >         public String user;
> >         public String page;
> >         public Long timestamp;
> >     }
> >
>
>
> >     static public class UserProfile {
> >         public String region;
> >         public Long timestamp;
> >     }
>
>
>
> *----------Join between the views( i.e. KStream - PageView) & users (KTable
> - UserProfile) *
>
>
> KStream<String, PageViewByRegion> kstream1 =
> > *views        .leftJoin(users, *
> >         //PageView - first value type
> >         //UserProfile - 2nd value type
> >         //PageViewByRegion - Joined Value
> >         new ValueJoiner<PageView, UserProfile, PageViewByRegion>() {
> >             @Override
> >             public PageViewByRegion apply(PageView view, UserProfile
> > profile) {
> >                 PageViewByRegion viewByRegion = new PageViewByRegion();
> >                 viewByRegion.user = view.user;
> >                 viewByRegion.page = view.page;
> >                 System.out.println(" viewByRegion.user " +
> > viewByRegion.user);
> >                 System.out.println(" viewByRegion.page " +
> > viewByRegion.page);
> >
> >                 if (profile != null) {
> >                     viewByRegion.region = profile.region;
> >                 } else {
> >                     viewByRegion.region = "UNKNOWN";
> >                 }
> >
> >                 System.out.println(" viewByRegion.page " +
> > viewByRegion.region);
> >
> >                 return viewByRegion;
> >             }
> >         }
> >         )
>