You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Shekar Tippur <ct...@gmail.com> on 2017/06/25 07:45:49 UTC

KStreams to KTable join

Hello,

I am having trouble implementing streams to table join.

I have 2 POJO's each representing streams and table data structures. raw
topic contains streams and cache topic contains table structure. The join
is not happening since the print statement is not being called.

Appreciate any pointers.

- Shekar

raw.leftJoin(cache, new ValueJoiner<RawPOJOClass,
CachePOJOClass,RawPOJOClass>() {

    @Override
    public RawPOJOClass apply(RawPOJOClass r, CachePOJOClass c) {

        String src=r.getSource();
        String cSrc=c.getSnowHost();
        Custom custom=new Custom();

        if (src.matches(snowSrc)){
            System.out.println("In apply code");
            custom.setAdditionalProperty("custom",cSrc.getAll());
            r.setCustom(custom);
        }
        return r;
    }
}).to("parser");

Re: KStreams to KTable join

Posted by Shekar Tippur <ct...@gmail.com>.
Guozhang,

Thanks for responding.
The raw and cache keys are null. Both KStream and KTable entries are json's.


Here is the input to cache (KTable)

{"user_name": "Joe", "location": "US", "gender": "male"}
{"user_name": "Julie", "location": "US", "gender": "female"}}

{"user_name": "Kawasaki", "location": "Japan", "gender": "male"}

The kstream gets a event (KStreams)

{"user": "Joe", "custom": {"choice":"vegan"}}

I want a output as a join

{"user": "Joe", "custom": {"choice":"vegan","enriched":*{"location": "US",
"gender": "male"}*} }

I want to take whats in ktable and add to enriched section of the output
stream.


//

KStream<String, RawPOJOClass > raw = builder.stream(Serdes.String(),
rawSerde, "raw");
KTable <String, CachePOJOClass > cache = builder.table("cache", "local-cache");


raw.leftJoin(cache, new ValueJoiner<RawPOJOClass,
CachePOJOClass,RawPOJOClass>() {

    @Override
    public RawPOJOClass apply(RawPOJOClass r, CachePOJOClass c) {

        String src=r.getSource();
        String cSrc=c.getSnowHost();
        Custom custom=new Custom();

        if (src.matches(snowSrc)){
            System.out.println("In apply code");
            custom.setAdditionalProperty("custom",cSrc.getAll());
            r.setCustom(custom);
        }
        return r;
    }
}).to("parser");

-------

I tried adding a key but I get a serializer error. For example, cache entry:
"joe", {"user_name": "Joe", "location": "US", "gender": "male"}

Raw entry:

"joe", {"user": "Joe", "custom": {"choice":"vegan"}}


Here is the error:

com.fasterxml.jackson.databind.JsonMappingException: Can not construct
instance of com.intuit.argos_streams.system.SnowServerPOJOClass: no
String-argument constructor/factory method to deserialize from String
value ('joe')

 at [Source: [B@5dfc53f4; line: 1, column: 1]

at com.fasterxml.jackson.databind.JsonMappingException.from(JsonMappingException.java:270)

at com.fasterxml.jackson.databind.DeserializationContext.instantiationException(DeserializationContext.java:1456)

at com.fasterxml.jackson.databind.DeserializationContext.handleMissingInstantiator(DeserializationContext.java:1012)

at com.fasterxml.jackson.databind.deser.ValueInstantiator._createFromStringFallbacks(ValueInstantiator.java:370)

at com.fasterxml.jackson.databind.deser.std.StdValueInstantiator.createFromString(StdValueInstantiator.java:315)

at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromString(BeanDeserializerBase.java:1282)

at com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeOther(BeanDeserializer.java:159)

at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:150)

at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3798)

at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2929)

at com.intuit.argos_streams.system.SnowServerDeserialzer.deserialize(SnowServerDeserialzer.java:40)

at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:46)

at org.apache.kafka.streams.processor.internals.SourceNodeRecordDeserializer.deserialize(SourceNodeRecordDeserializer.java:44)

at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:85)

at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117)

at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:158)

at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:605)

at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
- Shekar


On Sun, Jun 25, 2017 at 10:36 AM, Guozhang Wang <wa...@gmail.com> wrote:

> Hi Shekar,
>
> Could you demonstrate your input data. More specifically, what are the key
> types of your input streams, and are they not-null values? It seems the
> root cause is similar to the other thread you asked on the mailing list.
>
> Also, could you provide your used Kafka Streams version?
>
>
> Guozhang
>
>
> On Sun, Jun 25, 2017 at 12:45 AM, Shekar Tippur <ct...@gmail.com> wrote:
>
> > Hello,
> >
> > I am having trouble implementing streams to table join.
> >
> > I have 2 POJO's each representing streams and table data structures. raw
> > topic contains streams and cache topic contains table structure. The join
> > is not happening since the print statement is not being called.
> >
> > Appreciate any pointers.
> >
> > - Shekar
> >
> > raw.leftJoin(cache, new ValueJoiner<RawPOJOClass,
> > CachePOJOClass,RawPOJOClass>() {
> >
> >     @Override
> >     public RawPOJOClass apply(RawPOJOClass r, CachePOJOClass c) {
> >
> >         String src=r.getSource();
> >         String cSrc=c.getSnowHost();
> >         Custom custom=new Custom();
> >
> >         if (src.matches(snowSrc)){
> >             System.out.println("In apply code");
> >             custom.setAdditionalProperty("custom",cSrc.getAll());
> >             r.setCustom(custom);
> >         }
> >         return r;
> >     }
> > }).to("parser");
> >
>
>
>
> --
> -- Guozhang
>

Re: KStreams to KTable join

Posted by Shekar Tippur <ct...@gmail.com>.
Guozhang,

"1) if the coming record's key is null, then when it flows into the join
processor inside the topology this record will be dropped as it cannot be
joined with any records from the other stream."

Can you please elaborate on the notion of key? By keys, do you mean kafka
partition keys?
For a json kstream to ktable example, can you please show me a sample input?

For me, the ktable has:

{"user_name": "Joe", "location": "US", "gender": "male"}
{"user_name": "Julie", "location": "US", "gender": "female"}

{"user_name": "Kawasaki", "location": "Japan", "gender": "male"}

The kstream gets a event (KStreams)

{"user": "Joe", "custom": {"choice":"vegan"}}

Is this data right or do I need to have a key and then a json - as in:


"joe", {"user_name": "Joe", "location": "US", "gender": "male"}

- Shekar


On Mon, Jun 26, 2017 at 4:42 PM, Guozhang Wang <wa...@gmail.com> wrote:

> I think your issue is in two folds:
>
> 1) if the coming record's key is null, then when it flows into the join
> processor inside the topology this record will be dropped as it cannot be
> joined with any records from the other stream.
>
> 2) the NPE you are getting when giving it the non-null keyed record seems
> because, you are using "SnowServerDeserialzer" (is it set as the default
> key deserializer) which expects a SnowServerPOJOClass while the key "joe"
> is typed String. You need to override the key deserialize when constructing
> the "cache" KTable as well:
>
> ----------------
> KTable <String, CachePOJOClass > cache = builder.table(Serdes.String(),
> rawSerde, "cache", "local-cache");
>
>
> Guozhang
>
>
> On Sun, Jun 25, 2017 at 11:30 PM, Shekar Tippur <ct...@gmail.com> wrote:
>
> > Guozhang
> >
> > I am using 0.10.2.1 version
> >
> > - Shekar
> >
> > On Sun, Jun 25, 2017 at 10:36 AM, Guozhang Wang <wa...@gmail.com>
> > wrote:
> >
> > > Hi Shekar,
> > >
> > > Could you demonstrate your input data. More specifically, what are the
> > key
> > > types of your input streams, and are they not-null values? It seems the
> > > root cause is similar to the other thread you asked on the mailing
> list.
> > >
> > > Also, could you provide your used Kafka Streams version?
> > >
> > >
> > > Guozhang
> > >
> > >
> > > On Sun, Jun 25, 2017 at 12:45 AM, Shekar Tippur <ct...@gmail.com>
> > wrote:
> > >
> > > > Hello,
> > > >
> > > > I am having trouble implementing streams to table join.
> > > >
> > > > I have 2 POJO's each representing streams and table data structures.
> > raw
> > > > topic contains streams and cache topic contains table structure. The
> > join
> > > > is not happening since the print statement is not being called.
> > > >
> > > > Appreciate any pointers.
> > > >
> > > > - Shekar
> > > >
> > > > raw.leftJoin(cache, new ValueJoiner<RawPOJOClass,
> > > > CachePOJOClass,RawPOJOClass>() {
> > > >
> > > >     @Override
> > > >     public RawPOJOClass apply(RawPOJOClass r, CachePOJOClass c) {
> > > >
> > > >         String src=r.getSource();
> > > >         String cSrc=c.getSnowHost();
> > > >         Custom custom=new Custom();
> > > >
> > > >         if (src.matches(snowSrc)){
> > > >             System.out.println("In apply code");
> > > >             custom.setAdditionalProperty("custom",cSrc.getAll());
> > > >             r.setCustom(custom);
> > > >         }
> > > >         return r;
> > > >     }
> > > > }).to("parser");
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
>
> --
> -- Guozhang
>

Re: KStreams to KTable join

Posted by Shekar Tippur <ct...@gmail.com>.
If I understand it right, I need to pass the key and ensure that I handle
it correctly in the pojo constructor.
Let me give it a shot.

- Shekar

On Mon, Jun 26, 2017 at 4:42 PM, Guozhang Wang <wa...@gmail.com> wrote:

> I think your issue is in two folds:
>
> 1) if the coming record's key is null, then when it flows into the join
> processor inside the topology this record will be dropped as it cannot be
> joined with any records from the other stream.
>
> 2) the NPE you are getting when giving it the non-null keyed record seems
> because, you are using "SnowServerDeserialzer" (is it set as the default
> key deserializer) which expects a SnowServerPOJOClass while the key "joe"
> is typed String. You need to override the key deserialize when constructing
> the "cache" KTable as well:
>
> ----------------
> KTable <String, CachePOJOClass > cache = builder.table(Serdes.String(),
> rawSerde, "cache", "local-cache");
>
>
> Guozhang
>
>
> On Sun, Jun 25, 2017 at 11:30 PM, Shekar Tippur <ct...@gmail.com> wrote:
>
> > Guozhang
> >
> > I am using 0.10.2.1 version
> >
> > - Shekar
> >
> > On Sun, Jun 25, 2017 at 10:36 AM, Guozhang Wang <wa...@gmail.com>
> > wrote:
> >
> > > Hi Shekar,
> > >
> > > Could you demonstrate your input data. More specifically, what are the
> > key
> > > types of your input streams, and are they not-null values? It seems the
> > > root cause is similar to the other thread you asked on the mailing
> list.
> > >
> > > Also, could you provide your used Kafka Streams version?
> > >
> > >
> > > Guozhang
> > >
> > >
> > > On Sun, Jun 25, 2017 at 12:45 AM, Shekar Tippur <ct...@gmail.com>
> > wrote:
> > >
> > > > Hello,
> > > >
> > > > I am having trouble implementing streams to table join.
> > > >
> > > > I have 2 POJO's each representing streams and table data structures.
> > raw
> > > > topic contains streams and cache topic contains table structure. The
> > join
> > > > is not happening since the print statement is not being called.
> > > >
> > > > Appreciate any pointers.
> > > >
> > > > - Shekar
> > > >
> > > > raw.leftJoin(cache, new ValueJoiner<RawPOJOClass,
> > > > CachePOJOClass,RawPOJOClass>() {
> > > >
> > > >     @Override
> > > >     public RawPOJOClass apply(RawPOJOClass r, CachePOJOClass c) {
> > > >
> > > >         String src=r.getSource();
> > > >         String cSrc=c.getSnowHost();
> > > >         Custom custom=new Custom();
> > > >
> > > >         if (src.matches(snowSrc)){
> > > >             System.out.println("In apply code");
> > > >             custom.setAdditionalProperty("custom",cSrc.getAll());
> > > >             r.setCustom(custom);
> > > >         }
> > > >         return r;
> > > >     }
> > > > }).to("parser");
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
>
> --
> -- Guozhang
>

Re: KStreams to KTable join

Posted by Shekar Tippur <ct...@gmail.com>.
If I understand it right, I need to pass the key and ensure that I handle
it correctly in the pojo constructor.
Let me give it a shot.

- Shekar

On Mon, Jun 26, 2017 at 4:42 PM, Guozhang Wang <wa...@gmail.com> wrote:

> I think your issue is in two folds:
>
> 1) if the coming record's key is null, then when it flows into the join
> processor inside the topology this record will be dropped as it cannot be
> joined with any records from the other stream.
>
> 2) the NPE you are getting when giving it the non-null keyed record seems
> because, you are using "SnowServerDeserialzer" (is it set as the default
> key deserializer) which expects a SnowServerPOJOClass while the key "joe"
> is typed String. You need to override the key deserialize when constructing
> the "cache" KTable as well:
>
> ----------------
> KTable <String, CachePOJOClass > cache = builder.table(Serdes.String(),
> rawSerde, "cache", "local-cache");
>
>
> Guozhang
>
>
> On Sun, Jun 25, 2017 at 11:30 PM, Shekar Tippur <ct...@gmail.com> wrote:
>
> > Guozhang
> >
> > I am using 0.10.2.1 version
> >
> > - Shekar
> >
> > On Sun, Jun 25, 2017 at 10:36 AM, Guozhang Wang <wa...@gmail.com>
> > wrote:
> >
> > > Hi Shekar,
> > >
> > > Could you demonstrate your input data. More specifically, what are the
> > key
> > > types of your input streams, and are they not-null values? It seems the
> > > root cause is similar to the other thread you asked on the mailing
> list.
> > >
> > > Also, could you provide your used Kafka Streams version?
> > >
> > >
> > > Guozhang
> > >
> > >
> > > On Sun, Jun 25, 2017 at 12:45 AM, Shekar Tippur <ct...@gmail.com>
> > wrote:
> > >
> > > > Hello,
> > > >
> > > > I am having trouble implementing streams to table join.
> > > >
> > > > I have 2 POJO's each representing streams and table data structures.
> > raw
> > > > topic contains streams and cache topic contains table structure. The
> > join
> > > > is not happening since the print statement is not being called.
> > > >
> > > > Appreciate any pointers.
> > > >
> > > > - Shekar
> > > >
> > > > raw.leftJoin(cache, new ValueJoiner<RawPOJOClass,
> > > > CachePOJOClass,RawPOJOClass>() {
> > > >
> > > >     @Override
> > > >     public RawPOJOClass apply(RawPOJOClass r, CachePOJOClass c) {
> > > >
> > > >         String src=r.getSource();
> > > >         String cSrc=c.getSnowHost();
> > > >         Custom custom=new Custom();
> > > >
> > > >         if (src.matches(snowSrc)){
> > > >             System.out.println("In apply code");
> > > >             custom.setAdditionalProperty("custom",cSrc.getAll());
> > > >             r.setCustom(custom);
> > > >         }
> > > >         return r;
> > > >     }
> > > > }).to("parser");
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
>
> --
> -- Guozhang
>

Re: KStreams to KTable join

Posted by Guozhang Wang <wa...@gmail.com>.
When KStream / KTable is created from a source topic, both of them has a
record as a key-value pair, and the key is read from Kafka as the message
key.

What you showed in JSON seems only be the value of the message, and hence
I'm asking what's the key of the message, which will be the key of the
streams record.


Guozhang

On Thu, Jun 29, 2017 at 2:34 PM, Shekar Tippur <ct...@gmail.com> wrote:

> Guozhang,
>
> "1) if the coming record's key is null, then when it flows into the join
> processor inside the topology this record will be dropped as it cannot be
> joined with any records from the other stream."
>
> Can you please elaborate on the notion of key? By keys, do you mean kafka
> partition keys?
> For a json kstream to ktable example, can you please show me a sample
> input?
>
> For me, the ktable has:
>
> {"user_name": "Joe", "location": "US", "gender": "male"}
> {"user_name": "Julie", "location": "US", "gender": "female"}
>
> {"user_name": "Kawasaki", "location": "Japan", "gender": "male"}
>
> The kstream gets a event (KStreams)
>
> {"user": "Joe", "custom": {"choice":"vegan"}}
>
> Is this data right or do I need to have a key and then a json - as in:
>
>
> "joe", {"user_name": "Joe", "location": "US", "gender": "male"}
>
>
>
>
> On Mon, Jun 26, 2017 at 4:42 PM, Guozhang Wang <wa...@gmail.com> wrote:
>
> > I think your issue is in two folds:
> >
> > 1) if the coming record's key is null, then when it flows into the join
> > processor inside the topology this record will be dropped as it cannot be
> > joined with any records from the other stream.
> >
> > 2) the NPE you are getting when giving it the non-null keyed record seems
> > because, you are using "SnowServerDeserialzer" (is it set as the default
> > key deserializer) which expects a SnowServerPOJOClass while the key "joe"
> > is typed String. You need to override the key deserialize when
> constructing
> > the "cache" KTable as well:
> >
> > ----------------
> > KTable <String, CachePOJOClass > cache = builder.table(Serdes.String(),
> > rawSerde, "cache", "local-cache");
> >
> >
> > Guozhang
> >
> >
> > On Sun, Jun 25, 2017 at 11:30 PM, Shekar Tippur <ct...@gmail.com>
> wrote:
> >
> > > Guozhang
> > >
> > > I am using 0.10.2.1 version
> > >
> > > - Shekar
> > >
> > > On Sun, Jun 25, 2017 at 10:36 AM, Guozhang Wang <wa...@gmail.com>
> > > wrote:
> > >
> > > > Hi Shekar,
> > > >
> > > > Could you demonstrate your input data. More specifically, what are
> the
> > > key
> > > > types of your input streams, and are they not-null values? It seems
> the
> > > > root cause is similar to the other thread you asked on the mailing
> > list.
> > > >
> > > > Also, could you provide your used Kafka Streams version?
> > > >
> > > >
> > > > Guozhang
> > > >
> > > >
> > > > On Sun, Jun 25, 2017 at 12:45 AM, Shekar Tippur <ct...@gmail.com>
> > > wrote:
> > > >
> > > > > Hello,
> > > > >
> > > > > I am having trouble implementing streams to table join.
> > > > >
> > > > > I have 2 POJO's each representing streams and table data
> structures.
> > > raw
> > > > > topic contains streams and cache topic contains table structure.
> The
> > > join
> > > > > is not happening since the print statement is not being called.
> > > > >
> > > > > Appreciate any pointers.
> > > > >
> > > > > - Shekar
> > > > >
> > > > > raw.leftJoin(cache, new ValueJoiner<RawPOJOClass,
> > > > > CachePOJOClass,RawPOJOClass>() {
> > > > >
> > > > >     @Override
> > > > >     public RawPOJOClass apply(RawPOJOClass r, CachePOJOClass c) {
> > > > >
> > > > >         String src=r.getSource();
> > > > >         String cSrc=c.getSnowHost();
> > > > >         Custom custom=new Custom();
> > > > >
> > > > >         if (src.matches(snowSrc)){
> > > > >             System.out.println("In apply code");
> > > > >             custom.setAdditionalProperty("custom",cSrc.getAll());
> > > > >             r.setCustom(custom);
> > > > >         }
> > > > >         return r;
> > > > >     }
> > > > > }).to("parser");
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang

Re: KStreams to KTable join

Posted by Shekar Tippur <ct...@gmail.com>.
Guozhang,

"1) if the coming record's key is null, then when it flows into the join
processor inside the topology this record will be dropped as it cannot be
joined with any records from the other stream."

Can you please elaborate on the notion of key? By keys, do you mean kafka
partition keys?
For a json kstream to ktable example, can you please show me a sample input?

For me, the ktable has:

{"user_name": "Joe", "location": "US", "gender": "male"}
{"user_name": "Julie", "location": "US", "gender": "female"}

{"user_name": "Kawasaki", "location": "Japan", "gender": "male"}

The kstream gets a event (KStreams)

{"user": "Joe", "custom": {"choice":"vegan"}}

Is this data right or do I need to have a key and then a json - as in:


"joe", {"user_name": "Joe", "location": "US", "gender": "male"}




On Mon, Jun 26, 2017 at 4:42 PM, Guozhang Wang <wa...@gmail.com> wrote:

> I think your issue is in two folds:
>
> 1) if the coming record's key is null, then when it flows into the join
> processor inside the topology this record will be dropped as it cannot be
> joined with any records from the other stream.
>
> 2) the NPE you are getting when giving it the non-null keyed record seems
> because, you are using "SnowServerDeserialzer" (is it set as the default
> key deserializer) which expects a SnowServerPOJOClass while the key "joe"
> is typed String. You need to override the key deserialize when constructing
> the "cache" KTable as well:
>
> ----------------
> KTable <String, CachePOJOClass > cache = builder.table(Serdes.String(),
> rawSerde, "cache", "local-cache");
>
>
> Guozhang
>
>
> On Sun, Jun 25, 2017 at 11:30 PM, Shekar Tippur <ct...@gmail.com> wrote:
>
> > Guozhang
> >
> > I am using 0.10.2.1 version
> >
> > - Shekar
> >
> > On Sun, Jun 25, 2017 at 10:36 AM, Guozhang Wang <wa...@gmail.com>
> > wrote:
> >
> > > Hi Shekar,
> > >
> > > Could you demonstrate your input data. More specifically, what are the
> > key
> > > types of your input streams, and are they not-null values? It seems the
> > > root cause is similar to the other thread you asked on the mailing
> list.
> > >
> > > Also, could you provide your used Kafka Streams version?
> > >
> > >
> > > Guozhang
> > >
> > >
> > > On Sun, Jun 25, 2017 at 12:45 AM, Shekar Tippur <ct...@gmail.com>
> > wrote:
> > >
> > > > Hello,
> > > >
> > > > I am having trouble implementing streams to table join.
> > > >
> > > > I have 2 POJO's each representing streams and table data structures.
> > raw
> > > > topic contains streams and cache topic contains table structure. The
> > join
> > > > is not happening since the print statement is not being called.
> > > >
> > > > Appreciate any pointers.
> > > >
> > > > - Shekar
> > > >
> > > > raw.leftJoin(cache, new ValueJoiner<RawPOJOClass,
> > > > CachePOJOClass,RawPOJOClass>() {
> > > >
> > > >     @Override
> > > >     public RawPOJOClass apply(RawPOJOClass r, CachePOJOClass c) {
> > > >
> > > >         String src=r.getSource();
> > > >         String cSrc=c.getSnowHost();
> > > >         Custom custom=new Custom();
> > > >
> > > >         if (src.matches(snowSrc)){
> > > >             System.out.println("In apply code");
> > > >             custom.setAdditionalProperty("custom",cSrc.getAll());
> > > >             r.setCustom(custom);
> > > >         }
> > > >         return r;
> > > >     }
> > > > }).to("parser");
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
>
> --
> -- Guozhang
>

Re: KStreams to KTable join

Posted by Guozhang Wang <wa...@gmail.com>.
I think your issue is in two folds:

1) if the coming record's key is null, then when it flows into the join
processor inside the topology this record will be dropped as it cannot be
joined with any records from the other stream.

2) the NPE you are getting when giving it the non-null keyed record seems
because, you are using "SnowServerDeserialzer" (is it set as the default
key deserializer) which expects a SnowServerPOJOClass while the key "joe"
is typed String. You need to override the key deserialize when constructing
the "cache" KTable as well:

----------------
KTable <String, CachePOJOClass > cache = builder.table(Serdes.String(),
rawSerde, "cache", "local-cache");


Guozhang


On Sun, Jun 25, 2017 at 11:30 PM, Shekar Tippur <ct...@gmail.com> wrote:

> Guozhang
>
> I am using 0.10.2.1 version
>
> - Shekar
>
> On Sun, Jun 25, 2017 at 10:36 AM, Guozhang Wang <wa...@gmail.com>
> wrote:
>
> > Hi Shekar,
> >
> > Could you demonstrate your input data. More specifically, what are the
> key
> > types of your input streams, and are they not-null values? It seems the
> > root cause is similar to the other thread you asked on the mailing list.
> >
> > Also, could you provide your used Kafka Streams version?
> >
> >
> > Guozhang
> >
> >
> > On Sun, Jun 25, 2017 at 12:45 AM, Shekar Tippur <ct...@gmail.com>
> wrote:
> >
> > > Hello,
> > >
> > > I am having trouble implementing streams to table join.
> > >
> > > I have 2 POJO's each representing streams and table data structures.
> raw
> > > topic contains streams and cache topic contains table structure. The
> join
> > > is not happening since the print statement is not being called.
> > >
> > > Appreciate any pointers.
> > >
> > > - Shekar
> > >
> > > raw.leftJoin(cache, new ValueJoiner<RawPOJOClass,
> > > CachePOJOClass,RawPOJOClass>() {
> > >
> > >     @Override
> > >     public RawPOJOClass apply(RawPOJOClass r, CachePOJOClass c) {
> > >
> > >         String src=r.getSource();
> > >         String cSrc=c.getSnowHost();
> > >         Custom custom=new Custom();
> > >
> > >         if (src.matches(snowSrc)){
> > >             System.out.println("In apply code");
> > >             custom.setAdditionalProperty("custom",cSrc.getAll());
> > >             r.setCustom(custom);
> > >         }
> > >         return r;
> > >     }
> > > }).to("parser");
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang

Re: KStreams to KTable join

Posted by Shekar Tippur <ct...@gmail.com>.
Guozhang

I am using 0.10.2.1 version

- Shekar

On Sun, Jun 25, 2017 at 10:36 AM, Guozhang Wang <wa...@gmail.com> wrote:

> Hi Shekar,
>
> Could you demonstrate your input data. More specifically, what are the key
> types of your input streams, and are they not-null values? It seems the
> root cause is similar to the other thread you asked on the mailing list.
>
> Also, could you provide your used Kafka Streams version?
>
>
> Guozhang
>
>
> On Sun, Jun 25, 2017 at 12:45 AM, Shekar Tippur <ct...@gmail.com> wrote:
>
> > Hello,
> >
> > I am having trouble implementing streams to table join.
> >
> > I have 2 POJO's each representing streams and table data structures. raw
> > topic contains streams and cache topic contains table structure. The join
> > is not happening since the print statement is not being called.
> >
> > Appreciate any pointers.
> >
> > - Shekar
> >
> > raw.leftJoin(cache, new ValueJoiner<RawPOJOClass,
> > CachePOJOClass,RawPOJOClass>() {
> >
> >     @Override
> >     public RawPOJOClass apply(RawPOJOClass r, CachePOJOClass c) {
> >
> >         String src=r.getSource();
> >         String cSrc=c.getSnowHost();
> >         Custom custom=new Custom();
> >
> >         if (src.matches(snowSrc)){
> >             System.out.println("In apply code");
> >             custom.setAdditionalProperty("custom",cSrc.getAll());
> >             r.setCustom(custom);
> >         }
> >         return r;
> >     }
> > }).to("parser");
> >
>
>
>
> --
> -- Guozhang
>

Re: KStreams to KTable join

Posted by Guozhang Wang <wa...@gmail.com>.
Hi Shekar,

Could you demonstrate your input data. More specifically, what are the key
types of your input streams, and are they not-null values? It seems the
root cause is similar to the other thread you asked on the mailing list.

Also, could you provide your used Kafka Streams version?


Guozhang


On Sun, Jun 25, 2017 at 12:45 AM, Shekar Tippur <ct...@gmail.com> wrote:

> Hello,
>
> I am having trouble implementing streams to table join.
>
> I have 2 POJO's each representing streams and table data structures. raw
> topic contains streams and cache topic contains table structure. The join
> is not happening since the print statement is not being called.
>
> Appreciate any pointers.
>
> - Shekar
>
> raw.leftJoin(cache, new ValueJoiner<RawPOJOClass,
> CachePOJOClass,RawPOJOClass>() {
>
>     @Override
>     public RawPOJOClass apply(RawPOJOClass r, CachePOJOClass c) {
>
>         String src=r.getSource();
>         String cSrc=c.getSnowHost();
>         Custom custom=new Custom();
>
>         if (src.matches(snowSrc)){
>             System.out.println("In apply code");
>             custom.setAdditionalProperty("custom",cSrc.getAll());
>             r.setCustom(custom);
>         }
>         return r;
>     }
> }).to("parser");
>



-- 
-- Guozhang