You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Avi Flax <av...@parkassist.com> on 2016/06/10 18:24:01 UTC

KStreams.reduceByKey passing nulls to my Deserializer?

Hi, I’m using Kafka Streams (0.10.0) with JRuby, most of my scripts/nodes are working well at this point, except for one which is using reduceByKey.

This is the first time I’m trying to use the local state store so it’s possible there’s something misconfigured, I’m not sure. My config is pretty vanilla and minimal.

My debugging so far shows that reduceByKey is passing nil/null values to my Deserializer. I wasn’t expecting this and my Deserializer is currently raising exceptions in this case.

I guess I’d like to know — is this normal, expected behavior? If so, why, and what does it mean, and how am I meant to handle it?

If not, any idea why it might be happening?

My stack trace is pretty crazy due to JRuby (there’s probably a way to filter it better but I’m new to JRuby, sorry), but here are the most salient lines:

at RUBY.deserialize(uri:classloader:/lib/avro_utils/avro_kafka_serde.rb:31)
	at org.jruby.internal.runtime.methods.MixedModeIRMethod.call(org/jruby/internal/runtime/methods/MixedModeIRMethod.java:126)
	at KafkaAvroHashSerdes$$Deserializer_1394592798.deserialize(KafkaAvroHashSerdes$$Deserializer_1394592798.gen:13)
	at org.apache.kafka.streams.state.StateSerdes.valueFrom(org/apache/kafka/streams/state/StateSerdes.java:156)
	at org.apache.kafka.streams.state.internals.RocksDBStore.get(org/apache/kafka/streams/state/internals/RocksDBStore.java:241)
	at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.get(org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java:100)
	at org.apache.kafka.streams.kstream.internals.KStreamReduce$KStreamReduceProcessor.process(org/apache/kafka/streams/kstream/internals/KStreamReduce.java:70)

You can see the entire thing here:
https://gist.github.com/aviflax/3428cdbaa18aca9bf0a958c6d5eac2bd

Thank you!
Avi

Re: KStreams.reduceByKey passing nulls to my Deserializer?

Posted by Guozhang Wang <wa...@gmail.com>.
Thanks Avi!

On Tue, Jun 14, 2016 at 7:41 AM, Avi Flax <av...@parkassist.com> wrote:

>
> > On Jun 10, 2016, at 18:47, Guozhang Wang <wa...@gmail.com> wrote:
> >
> > Yes, this is possible
>
> OK, good to know — thanks!
>
> I just checked the code in the Deserializers included with Kafka and I see
> that they check for null values and simply pass them through; I guess
> that’s the correct behavior. I’ve opened a PR to clarify the Javadocs for
> Deserializer: https://github.com/apache/kafka/pull/1503
>
> > although we checked nullable keys when doing reduce
> > / aggregations
>
> Yeah, I’ve seen that, it makes sense and is very helpful.
>
> > We do not check if the there are any values returned from the underlying
> > state store, and when we cannot find such a match, null is returned to
> > deserializer.
>
> OK, got it.
>
> > I think in general the library should guard this case instead of letting
> > desers worry about it. Do you want to file a JIRA reporting this bug so
> we
> > can follow-up?
>
> I agree. Done: https://issues.apache.org/jira/browse/KAFKA-3836
>
> Thanks!
> Avi




-- 
-- Guozhang

Re: KStreams.reduceByKey passing nulls to my Deserializer?

Posted by Avi Flax <av...@parkassist.com>.
> On Jun 10, 2016, at 18:47, Guozhang Wang <wa...@gmail.com> wrote:
> 
> Yes, this is possible

OK, good to know — thanks!

I just checked the code in the Deserializers included with Kafka and I see that they check for null values and simply pass them through; I guess that’s the correct behavior. I’ve opened a PR to clarify the Javadocs for Deserializer: https://github.com/apache/kafka/pull/1503

> although we checked nullable keys when doing reduce
> / aggregations

Yeah, I’ve seen that, it makes sense and is very helpful.

> We do not check if the there are any values returned from the underlying
> state store, and when we cannot find such a match, null is returned to
> deserializer.

OK, got it.

> I think in general the library should guard this case instead of letting
> desers worry about it. Do you want to file a JIRA reporting this bug so we
> can follow-up?

I agree. Done: https://issues.apache.org/jira/browse/KAFKA-3836

Thanks!
Avi

Re: KStreams.reduceByKey passing nulls to my Deserializer?

Posted by Guozhang Wang <wa...@gmail.com>.
Hello Avi,

Yes, this is possible: although we checked nullable keys when doing reduce
/ aggregations:

https://github.com/apache/kafka/blob/0.10.0/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java#L67

We do not check if the there are any values returned from the underlying
state store, and when we cannot find such a match, null is returned to
deserializer.


I think in general the library should guard this case instead of letting
desers worry about it. Do you want to file a JIRA reporting this bug so we
can follow-up?


Guozhang

On Fri, Jun 10, 2016 at 11:27 AM, Avi Flax <av...@parkassist.com> wrote:

>
> > On Jun 10, 2016, at 14:24, Avi Flax <av...@parkassist.com> wrote:
> >
> > Hi, I’m using Kafka Streams (0.10.0) with JRuby, most of my
> scripts/nodes are working well at this point, except for one which is using
> reduceByKey.
>
> Whoops, I should probably share my code as well!
>
> Here’s the topology:
>
>
> builder.stream(key_serde, val_serde, 'visit-update-events')
>        .reduceByKey(-> (a, b) { a.merge b }, key_serde, val_serde,
> 'intermediate-visits')
>        .to(key_serde, val_serde, 'visits’)
>
>
> This is using Ruby syntax, but hopefully it’s fairly readable. I’ve added
> it to this gist as well:
> https://gist.github.com/aviflax/3428cdbaa18aca9bf0a958c6d5eac2bd
>
> Thanks!
> Avi




-- 
-- Guozhang

Re: KStreams.reduceByKey passing nulls to my Deserializer?

Posted by Avi Flax <av...@parkassist.com>.
> On Jun 10, 2016, at 14:24, Avi Flax <av...@parkassist.com> wrote:
> 
> Hi, I’m using Kafka Streams (0.10.0) with JRuby, most of my scripts/nodes are working well at this point, except for one which is using reduceByKey.

Whoops, I should probably share my code as well!

Here’s the topology:


builder.stream(key_serde, val_serde, 'visit-update-events')
       .reduceByKey(-> (a, b) { a.merge b }, key_serde, val_serde, 'intermediate-visits')
       .to(key_serde, val_serde, 'visits’)


This is using Ruby syntax, but hopefully it’s fairly readable. I’ve added it to this gist as well:
https://gist.github.com/aviflax/3428cdbaa18aca9bf0a958c6d5eac2bd

Thanks!
Avi