You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Dumitru-Nicolae Marasoui <ni...@ovoenergy.com> on 2020/06/17 15:29:37 UTC

NPE in kafka-streams with Avro input

Hello kafka community,
When the following kafka-streams starts with input topic values in avro
format, we get this NPE below. The input is a record and a field of it is
an array of other records. Reading the stack trace below what I understand
is that at some point in deserializing a value structure it encounters an
unexpected null value and hence the NPE. Do you have any hints as to what
may be the problem? In this kafka-streams ETL job we emit multiple messages
from a single input message (flatMapping the array field to the output).
Thank you
Exception in thread
“global-topic-conveyor-com.ovoenergy.globaltopics.pipelines.ServiceV1-b6ff13b6-2b26-4b88-b3eb-87ee8f2159e0-StreamThread-1"
java.lang.NullPointerException
at com.sksamuel.avro4s.Decoder$$anon$12.decode(Decoder.scala:430)
at
com.sksamuel.avro4s.Decoder$$anon$12.$anonfun$decode$12(Decoder.scala:416)
at
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
at
scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
at
scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:38)
at scala.collection.TraversableLike.map(TraversableLike.scala:238)
at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
at com.sksamuel.avro4s.Decoder$$anon$12.decode(Decoder.scala:381)
at com.sksamuel.avro4s.FromRecord$$anon$1.from(FromRecord.scala:16)
at com.sksamuel.avro4s.RecordFormat$$anon$1.from(RecordFormat.scala:22)
at
com.ovoenergy.globaltopics.serdes.SerdeProvider$$anon$3.deserialize(SerdeProvider.scala:87)
at
org.apache.kafka.streams.state.internals.ValueAndTimestampDeserializer.deserialize(ValueAndTimestampDeserializer.java:54)
at
org.apache.kafka.streams.state.internals.ValueAndTimestampDeserializer.deserialize(ValueAndTimestampDeserializer.java:27)
at
org.apache.kafka.streams.state.StateSerdes.valueFrom(StateSerdes.java:160)
at
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.outerValue(MeteredKeyValueStore.java:363)
at
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.get(MeteredKeyValueStore.java:244)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl$KeyValueStoreReadWriteDecorator.get(ProcessorContextImpl.java:465)
at
org.apache.kafka.streams.kstream.internals.KStreamReduce$KStreamReduceValueGetter.get(KStreamReduce.java:135)
at
org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoin$KTableKTableLeftJoinProcessor.process(KTableKTableLeftJoin.java:100)
at
org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoin$KTableKTableLeftJoinProcessor.process(KTableKTableLeftJoin.java:66)
at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:118)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
at
org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener.apply(TimestampedCacheFlushListener.java:45)
at
org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener.apply(TimestampedCacheFlushListener.java:28)
at
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$setFlushListener$1(MeteredKeyValueStore.java:227)
at
org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward(CachingKeyValueStore.java:92)
at
org.apache.kafka.streams.state.internals.CachingKeyValueStore.lambda$initInternal$0(CachingKeyValueStore.java:72)
at
org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:151)
at
org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:244)
at
org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:240)
at
org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:150)
at
org.apache.kafka.streams.state.internals.CachingKeyValueStore.putInternal(CachingKeyValueStore.java:131)
at
org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:123)
at
org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:36)
at
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put(MeteredKeyValueStore.java:262)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl$KeyValueStoreReadWriteDecorator.put(ProcessorContextImpl.java:487)
at
org.apache.kafka.streams.kstream.internals.KStreamReduce$KStreamReduceProcessor.process(KStreamReduce.java:103)
at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:118)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
at
org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87)
at
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:428)
at
org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:474)
at
org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:536)
at
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:792)
at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698)
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671)
Thank you,
Nicolae

-- 
Thank you,
Nicolae Marasoiu
Scala Engineer
Orion, OVO Group

Re: NPE in kafka-streams with Avro input

Posted by Ricardo Ferreira <ri...@riferrei.com>.
Hi Dumitru,

According to the stack trace that you've shared the NPE is being thrown 
by this framework called *Avro4S* that you're using. This is important 
to isolate the problem because it means that it is not Kafka Streams the 
problem but rather, your serialization framework.

Nevertheless, the Avro specification allows fields to be null if you 
explicitly specify this in the Avro file. For instance:

```

{
   "type": "record",
   "name": "MyRecord",
   "fields" : [
     {"name": "userId", "type": "long"},              // mandatory field
     {"name": "userName", "type": ["null", "string"]} // optional field
   ]
}

```

The field *userName* above can have null values and be treated as 
optional. You may want to check if you can make this change in the Avro 
file or if it is made already, if the serialization framework that 
you're using don't have problems in handling situations like this.

Thanks,

-- Ricardo

On 6/17/20 11:29 AM, Dumitru-Nicolae Marasoui wrote:
> Hello kafka community,
> When the following kafka-streams starts with input topic values in avro
> format, we get this NPE below. The input is a record and a field of it is
> an array of other records. Reading the stack trace below what I understand
> is that at some point in deserializing a value structure it encounters an
> unexpected null value and hence the NPE. Do you have any hints as to what
> may be the problem? In this kafka-streams ETL job we emit multiple messages
> from a single input message (flatMapping the array field to the output).
> Thank you
> Exception in thread
> “global-topic-conveyor-com.ovoenergy.globaltopics.pipelines.ServiceV1-b6ff13b6-2b26-4b88-b3eb-87ee8f2159e0-StreamThread-1"
> java.lang.NullPointerException
> at com.sksamuel.avro4s.Decoder$$anon$12.decode(Decoder.scala:430)
> at
> com.sksamuel.avro4s.Decoder$$anon$12.$anonfun$decode$12(Decoder.scala:416)
> at
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
> at
> scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
> at
> scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
> at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:38)
> at scala.collection.TraversableLike.map(TraversableLike.scala:238)
> at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
> at scala.collection.AbstractTraversable.map(Traversable.scala:108)
> at com.sksamuel.avro4s.Decoder$$anon$12.decode(Decoder.scala:381)
> at com.sksamuel.avro4s.FromRecord$$anon$1.from(FromRecord.scala:16)
> at com.sksamuel.avro4s.RecordFormat$$anon$1.from(RecordFormat.scala:22)
> at
> com.ovoenergy.globaltopics.serdes.SerdeProvider$$anon$3.deserialize(SerdeProvider.scala:87)
> at
> org.apache.kafka.streams.state.internals.ValueAndTimestampDeserializer.deserialize(ValueAndTimestampDeserializer.java:54)
> at
> org.apache.kafka.streams.state.internals.ValueAndTimestampDeserializer.deserialize(ValueAndTimestampDeserializer.java:27)
> at
> org.apache.kafka.streams.state.StateSerdes.valueFrom(StateSerdes.java:160)
> at
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.outerValue(MeteredKeyValueStore.java:363)
> at
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.get(MeteredKeyValueStore.java:244)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl$KeyValueStoreReadWriteDecorator.get(ProcessorContextImpl.java:465)
> at
> org.apache.kafka.streams.kstream.internals.KStreamReduce$KStreamReduceValueGetter.get(KStreamReduce.java:135)
> at
> org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoin$KTableKTableLeftJoinProcessor.process(KTableKTableLeftJoin.java:100)
> at
> org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoin$KTableKTableLeftJoinProcessor.process(KTableKTableLeftJoin.java:66)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:118)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
> at
> org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener.apply(TimestampedCacheFlushListener.java:45)
> at
> org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener.apply(TimestampedCacheFlushListener.java:28)
> at
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$setFlushListener$1(MeteredKeyValueStore.java:227)
> at
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward(CachingKeyValueStore.java:92)
> at
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.lambda$initInternal$0(CachingKeyValueStore.java:72)
> at
> org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:151)
> at
> org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:244)
> at
> org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:240)
> at
> org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:150)
> at
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.putInternal(CachingKeyValueStore.java:131)
> at
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:123)
> at
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:36)
> at
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put(MeteredKeyValueStore.java:262)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl$KeyValueStoreReadWriteDecorator.put(ProcessorContextImpl.java:487)
> at
> org.apache.kafka.streams.kstream.internals.KStreamReduce$KStreamReduceProcessor.process(KStreamReduce.java:103)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:118)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
> at
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87)
> at
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:428)
> at
> org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:474)
> at
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:536)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:792)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671)
> Thank you,
> Nicolae
>