You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Dumitru-Nicolae Marasoui <Ni...@kaluza.com> on 2020/10/20 16:41:28 UTC

toombstones, kafkastreams, Avro & NPE

Hi,
I am trying to delete (tombstone) some records in a compacted topic, by
means of emitting a ("key", null) key-value pair, which by compaction would
get removed after some time.

However, I am getting the exception below:
Exception in thread
"SV-6c606e52-46eb-4a59-a006-27ce6ce1a603-StreamThread-1"
java.lang.NullPointerException
at magnolia.Magnolia$$anon$5.dereference(magnolia.scala:537)
at
com.sksamuel.avro4s.Encoder$$anon$15.$anonfun$encode$10(Encoder.scala:393)
at
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
at scala.collection.Iterator.foreach(Iterator.scala:941)
at scala.collection.Iterator.foreach$(Iterator.scala:941)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at scala.collection.TraversableLike.map(TraversableLike.scala:238)
at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
at com.sksamuel.avro4s.Encoder$$anon$15.encode(Encoder.scala:360)
at com.sksamuel.avro4s.ToRecord$$anon$1.to(ToRecord.scala:24)
at com.sksamuel.avro4s.RecordFormat$$anon$1.to(RecordFormat.scala:23)
at
com.ovoenergy.globaltopics.serdes.SerdeProvider$$anon$2.serialize(SerdeProvider.scala:67)
at
org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62)
at
com.ovoenergy.globaltopics.serdes.SerdeProvider$$anon$2.serialize(SerdeProvider.scala:64)
at
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:176)
at
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:111)
at
org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:92)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
at
org.apache.kafka.streams.kstream.internals.KStreamFlatMap$KStreamFlatMapProcessor.process(KStreamFlatMap.java:42)
at
org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
at
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)
at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
at
org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:104)
at
org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$3(StreamTask.java:383)
at
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)
at
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:383)
at
org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:475)
at
org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:550)
at
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:802)
at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697)
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)

Do you have any idea?

Thank you
Nicolae

-- 

Dumitru-Nicolae Marasoui

Software Engineer


140-142, Kensington Church Street, London, W8 4BN

w kaluza.com <https://www.kaluza.com/>

LinkedIn <https://www.linkedin.com/company/kaluza> | Twitter
<https://twitter.com/Kaluza_tech>

Kaluza Ltd. registered in England and Wales No. 08785057

VAT No. 100119879

Help save paper - do you need to print this email?

Re: toombstones, kafkastreams, Avro & NPE

Posted by "Matthias J. Sax" <mj...@apache.org>.
Well, the exception is throw fomr
`magnolia.Magnolia$$anon$5.dereference` and I have no idea what this is...

Maybe the used serde does not hangle `null` correctly?

As `null` should be transalted to `null`, it might be possible to
workaround the issue by "wrapping" the serde with you own impl that just
checks for `null` and returns `null` directly and only passed non-null
values into the actual serde?

Might also be worth to file a bug report for the tool you are using?


-Matthias

On 10/20/20 9:41 AM, Dumitru-Nicolae Marasoui wrote:
> Hi,
> I am trying to delete (tombstone) some records in a compacted topic, by
> means of emitting a ("key", null) key-value pair, which by compaction would
> get removed after some time.
> 
> However, I am getting the exception below:
> Exception in thread
> "SV-6c606e52-46eb-4a59-a006-27ce6ce1a603-StreamThread-1"
> java.lang.NullPointerException
> at magnolia.Magnolia$$anon$5.dereference(magnolia.scala:537)
> at
> com.sksamuel.avro4s.Encoder$$anon$15.$anonfun$encode$10(Encoder.scala:393)
> at
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
> at scala.collection.Iterator.foreach(Iterator.scala:941)
> at scala.collection.Iterator.foreach$(Iterator.scala:941)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
> at scala.collection.IterableLike.foreach(IterableLike.scala:74)
> at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
> at scala.collection.TraversableLike.map(TraversableLike.scala:238)
> at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
> at scala.collection.AbstractTraversable.map(Traversable.scala:108)
> at com.sksamuel.avro4s.Encoder$$anon$15.encode(Encoder.scala:360)
> at com.sksamuel.avro4s.ToRecord$$anon$1.to(ToRecord.scala:24)
> at com.sksamuel.avro4s.RecordFormat$$anon$1.to(RecordFormat.scala:23)
> at
> com.ovoenergy.globaltopics.serdes.SerdeProvider$$anon$2.serialize(SerdeProvider.scala:67)
> at
> org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62)
> at
> com.ovoenergy.globaltopics.serdes.SerdeProvider$$anon$2.serialize(SerdeProvider.scala:64)
> at
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:176)
> at
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:111)
> at
> org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:92)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
> at
> org.apache.kafka.streams.kstream.internals.KStreamFlatMap$KStreamFlatMapProcessor.process(KStreamFlatMap.java:42)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
> at
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
> at
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:104)
> at
> org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$3(StreamTask.java:383)
> at
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)
> at
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:383)
> at
> org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:475)
> at
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:550)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:802)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)
> 
> Do you have any idea?
> 
> Thank you
> Nicolae
> 

Re: toombstones, kafkastreams, Avro & NPE

Posted by Mathieu D <ma...@gmail.com>.
Hi Nicolae

A shot in the dark :  make sure to manage nulls separately in your custom
serializer.
Something like this:


private val _serializer = new KafkaAvroSerializer {
override def serialize(topic: String, obj: Any): Array[Byte] = obj match {
case null => null
case _ =>
val record = RecordFormat[T].to(obj.asInstanceOf[T])
super.serialize(topic, record)
}
}
private val _deserializer = new KafkaAvroDeserializer {
override def deserialize(s: String, bytes: Array[Byte]): T = bytes match {
case null => null
case _ =>
val record = super.deserialize(s, bytes).asInstanceOf[IndexedRecord]
RecordFormat[T].from(record)
}
}

HTH
Mathieu


Le mar. 20 oct. 2020 à 18:41, Dumitru-Nicolae Marasoui <
Nicolae.Marasoiu@kaluza.com> a écrit :

> Hi,
> I am trying to delete (tombstone) some records in a compacted topic, by
> means of emitting a ("key", null) key-value pair, which by compaction would
> get removed after some time.
>
> However, I am getting the exception below:
> Exception in thread
> "SV-6c606e52-46eb-4a59-a006-27ce6ce1a603-StreamThread-1"
> java.lang.NullPointerException
> at magnolia.Magnolia$$anon$5.dereference(magnolia.scala:537)
> at
> com.sksamuel.avro4s.Encoder$$anon$15.$anonfun$encode$10(Encoder.scala:393)
> at
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
> at scala.collection.Iterator.foreach(Iterator.scala:941)
> at scala.collection.Iterator.foreach$(Iterator.scala:941)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
> at scala.collection.IterableLike.foreach(IterableLike.scala:74)
> at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
> at scala.collection.TraversableLike.map(TraversableLike.scala:238)
> at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
> at scala.collection.AbstractTraversable.map(Traversable.scala:108)
> at com.sksamuel.avro4s.Encoder$$anon$15.encode(Encoder.scala:360)
> at com.sksamuel.avro4s.ToRecord$$anon$1.to(ToRecord.scala:24)
> at com.sksamuel.avro4s.RecordFormat$$anon$1.to(RecordFormat.scala:23)
> at
>
> com.ovoenergy.globaltopics.serdes.SerdeProvider$$anon$2.serialize(SerdeProvider.scala:67)
> at
>
> org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62)
> at
>
> com.ovoenergy.globaltopics.serdes.SerdeProvider$$anon$2.serialize(SerdeProvider.scala:64)
> at
>
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:176)
> at
>
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:111)
> at
>
> org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:92)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
> at
>
> org.apache.kafka.streams.kstream.internals.KStreamFlatMap$KStreamFlatMapProcessor.process(KStreamFlatMap.java:42)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
> at
>
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
> at
>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
> at
>
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:104)
> at
>
> org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$3(StreamTask.java:383)
> at
>
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)
> at
>
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:383)
> at
>
> org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:475)
> at
>
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:550)
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:802)
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697)
> at
>
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)
>
> Do you have any idea?
>
> Thank you
> Nicolae
>
> --
>
> Dumitru-Nicolae Marasoui
>
> Software Engineer
>
>
> 140-142, Kensington Church Street, London, W8 4BN
>
> w kaluza.com <https://www.kaluza.com/>
>
> LinkedIn <https://www.linkedin.com/company/kaluza> | Twitter
> <https://twitter.com/Kaluza_tech>
>
> Kaluza Ltd. registered in England and Wales No. 08785057
>
> VAT No. 100119879
>
> Help save paper - do you need to print this email?
>