You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Nitay Kufert <ni...@ironsrc.com> on 2021/01/13 13:45:56 UTC

NullPointerException after upgrading from 2.5.1 to 2.6.1 in my stream app

Hey,
*Without any code change*, just by bumping the kafka version from 2.5.1 to
2.6.1 (clients only) - my stream application started throwing
NullPointerException (sometimes, not in a predicted pattern).
Maybe it's worth mentioning that I also removed the "UPGRADE_FROM" conf
that was forgotten there from the older versions.

We are using Scala 2.12, and the line that throws this exception is using
flatMapValues:


>  inputStream.flatMapValues(_.split) # return type
> KStream[Windowed[String], SingleInputMessage]


Where inputStream is of type: KStream[Windowed[String], InputMessage] and
the split method splits this InputMessage into several
SingleInputMessage messages (hence the flat - to avoid
List[SingleInputMessage]).

The exception:

> java.lang.NullPointerException: null Wrapped by:
> org.apache.kafka.streams.errors.StreamsException: Exception caught in
> process. taskId=2_2, processor=unique_input_message-repartition-source,
> topic=service-unique_input_message-repartition, partition=2,
> offset=318846738, stacktrace=java.lang.NullPointerException
>

java.lang.NullPointerException: null at
> com.app.consumer.Utils$.$anonfun$buildCountersStream$1(ServiceUtils.scala:91)
> at
> org.apache.kafka.streams.scala.FunctionsCompatConversions$FlatValueMapperFromFunction$.$anonfun$asValueMapper$2(FunctionsCompatConversions.scala:62)
> at
> org.apache.kafka.streams.kstream.internals.AbstractStream.lambda$withKey$1(AbstractStream.java:105)
> at
> org.apache.kafka.streams.kstream.internals.KStreamFlatMapValues$KStreamFlatMapValuesProcessor.process(KStreamFlatMapValues.java:40)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
> at
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
> at
> org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:44)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
> at
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
> at
> org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
> at
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
> at
> org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:109)
> at
> org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:78)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
> at
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
> at
> org.apache.kafka.streams.kstream.internals.SessionCacheFlushListener.apply(SessionCacheFlushListener.java:43)
> at
> org.apache.kafka.streams.kstream.internals.SessionCacheFlushListener.apply(SessionCacheFlushListener.java:26)
> at
> org.apache.kafka.streams.state.internals.MeteredSessionStore.lambda$setFlushListener$1(MeteredSessionStore.java:97)
> at
> org.apache.kafka.streams.state.internals.CachingSessionStore.putAndMaybeForward(CachingSessionStore.java:98)
> at
> org.apache.kafka.streams.state.internals.CachingSessionStore.lambda$initInternal$0(CachingSessionStore.java:76)
> at
> org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:151)
> at
> org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:244)
> at
> org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:240)
> at
> org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:150)
> at
> org.apache.kafka.streams.state.internals.CachingSessionStore.put(CachingSessionStore.java:134)
> at
> org.apache.kafka.streams.state.internals.CachingSessionStore.remove(CachingSessionStore.java:142)
> at
> org.apache.kafka.streams.state.internals.MeteredSessionStore.lambda$remove$3(MeteredSessionStore.java:134)
> at
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> at
> org.apache.kafka.streams.state.internals.MeteredSessionStore.remove(MeteredSessionStore.java:131)
> at
> org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator$SessionStoreReadWriteDecorator.remove(AbstractReadWriteDecorator.java:221)
> at
> org.apache.kafka.streams.kstream.internals.KStreamSessionWindowAggregate$KStreamSessionWindowAggregateProcessor.process(KStreamSessionWindowAggregate.java:171)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
> at
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
> at
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:85)
> at
> org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:678)
> at
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> at
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:678)
> ... 4 common frames omitted Wrapped by:
> org.apache.kafka.streams.errors.StreamsException: Exception caught in
> process. taskId=2_2, processor=unique_input_message-repartition-source,
> topic=service-unique_input_message-repartition, partition=2,
> offset=318846738, stacktrace=java.lang.NullPointerException at
> com.app.consumer.ServiceUtils$.$anonfun$buildCountersStream$1(ServiceUtils.scala:91)
> at
> org.apache.kafka.streams.scala.FunctionsCompatConversions$FlatValueMapperFromFunction$.$anonfun$asValueMapper$2(FunctionsCompatConversions.scala:62)
> at
> org.apache.kafka.streams.kstream.internals.AbstractStream.lambda$withKey$1(AbstractStream.java:105)
> at
> org.apache.kafka.streams.kstream.internals.KStreamFlatMapValues$KStreamFlatMapValuesProcessor.process(KStreamFlatMapValues.java:40)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
> at
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
> at
> org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:44)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
> at
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
> at
> org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
> at
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
> at
> org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:109)
> at
> org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:78)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
> at
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
> at
> org.apache.kafka.streams.kstream.internals.SessionCacheFlushListener.apply(SessionCacheFlushListener.java:43)
> at
> org.apache.kafka.streams.kstream.internals.SessionCacheFlushListener.apply(SessionCacheFlushListener.java:26)
> at
> org.apache.kafka.streams.state.internals.MeteredSessionStore.lambda$setFlushListener$1(MeteredSessionStore.java:97)
> at
> org.apache.kafka.streams.state.internals.CachingSessionStore.putAndMaybeForward(CachingSessionStore.java:98)
> at
> org.apache.kafka.streams.state.internals.CachingSessionStore.lambda$initInternal$0(CachingSessionStore.java:76)
> at
> org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:151)
> at
> org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:244)
> at
> org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:240)
> at
> org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:150)
> at
> org.apache.kafka.streams.state.internals.CachingSessionStore.put(CachingSessionStore.java:134)
> at
> org.apache.kafka.streams.state.internals.CachingSessionStore.remove(CachingSessionStore.java:142)
> at
> org.apache.kafka.streams.state.internals.MeteredSessionStore.lambda$remove$3(MeteredSessionStore.java:134)
> at
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> at
> org.apache.kafka.streams.state.internals.MeteredSessionStore.remove(MeteredSessionStore.java:131)
> at
> org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator$SessionStoreReadWriteDecorator.remove(AbstractReadWriteDecorator.java:221)
> at
> org.apache.kafka.streams.kstream.internals.KStreamSessionWindowAggregate$KStreamSessionWindowAggregateProcessor.process(KStreamSessionWindowAggregate.java:171)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
> at
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
> at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
> at
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:85)
> at
> org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:678)
> at
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> at
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:678)
> at
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1034)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:690)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510)
> at
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:695)
> at
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1034)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:690)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510)
>

* 2nd line of the exception is because we are using Scala
(FunctionsCompatConversions.scala:62)

> implicit class FlatValueMapperFromFunction[V, VR](val f: V =>
> Iterable[VR]) extends AnyVal { def asValueMapper: ValueMapper[V,
> JIterable[VR]] = (value: V) => f(value).*asJava* }
>

The main thing here is that we didn't change anything in the app code.. so
i wonder if it's a new bug OR our implementation bug that somehow didn't
happen in 2.5.1 (or previous versions, since this logic is old)

Thanks and let me know what else can help (i wish i knew how to reproduce,
it happened 6 times during the last day and no idea why.. i'll try to catch
it with logs)

-- 

Nitay Kufert
Backend Team Leader
[image: ironSource] <http://www.ironsrc.com>

email nitay.k@ironsrc.com
mobile +972-54-5480021
fax +972-77-5448273
skype nitay.kufert.ssa
121 Menachem Begin St., Tel Aviv, Israel
ironsrc.com <http://www.ironsrc.com>
[image: linkedin] <https://www.linkedin.com/company/ironsource> [image:
twitter] <https://twitter.com/ironsource> [image: facebook]
<https://www.facebook.com/ironSource> [image: googleplus]
<https://plus.google.com/+ironsrc>
This email (including any attachments) is for the sole use of the intended
recipient and may contain confidential information which may be protected
by legal privilege. If you are not the intended recipient, or the employee
or agent responsible for delivering it to the intended recipient, you are
hereby notified that any use, dissemination, distribution or copying of
this communication and/or its content is strictly prohibited. If you are
not the intended recipient, please immediately notify us by reply email or
by telephone, delete this email and destroy any copies. Thank you.

Re: NullPointerException after upgrading from 2.5.1 to 2.6.1 in my stream app

Posted by Guozhang Wang <wa...@gmail.com>.
Hello Nitay,

I have not heard someone else reporting similar things that may point to a
bug still.. Maybe you could try to reproduce the issue by first starting a
brand new app in 2.5, and then follow the upgrade path (with config
overrides) to 2.6 and see if it is easily reproducible, and if yes create a
JIRA ticket?

Guozhang

On Wed, Jun 23, 2021 at 3:50 PM Nitay Kufert <ni...@is.com.invalid> wrote:

> Bumping for the off chance that during this time some sort of a bug was
> reported that might explain this behaviour..  i will feel more comfortable
> bumping our kafka versions this way :)
>
> On Wed, Feb 24, 2021 at 12:48 PM Nitay Kufert <ni...@ironsrc.com> wrote:
>
> > I guess it's possible but very unlikely because it works perfectly with
> > all the previous versions and the current one? (2.5.1)
> > Why did a change in the version introduce NULLS there?
> >
> > On Tue, Feb 23, 2021 at 9:16 PM Guozhang Wang <wa...@gmail.com>
> wrote:
> >
> >> Is it possible that the flattened values contain `null` and hence
> >> `_.split`
> >> throws?
> >>
> >> On Tue, Feb 23, 2021 at 8:23 AM Nitay Kufert <ni...@ironsrc.com>
> wrote:
> >>
> >> > Hey, missed your replay - but the code i've shared above the logs is
> the
> >> > code around those lines (removed some identifiers to make it a little
> >> bit
> >> > more generic):
> >> >
> >> > > inputStream.flatMapValues(_.split).peek((k, v) => {val _ = $k ->
> >> > > ${v.printForDebug}")}) # return type KStream[Windowed[String],
> >> > > SingleInputMessage]
> >> >
> >> >
> >> > On Fri, Jan 29, 2021 at 9:01 AM Guozhang Wang <wa...@gmail.com>
> >> wrote:
> >> >
> >> > > Could you share your code around
> >> > >
> >> > > >
> >> > >
> >> > >
> >> >
> >>
> com.app.consumer.Utils$.$anonfun$buildCountersStream$1(ServiceUtils.scala:91)
> >> > >
> >> > > That seems to be where NPE is thrown.
> >> > >
> >> > >
> >> > > On Wed, Jan 13, 2021 at 5:46 AM Nitay Kufert <ni...@ironsrc.com>
> >> > wrote:
> >> > >
> >> > > > Hey,
> >> > > > *Without any code change*, just by bumping the kafka version from
> >> 2.5.1
> >> > > to
> >> > > > 2.6.1 (clients only) - my stream application started throwing
> >> > > > NullPointerException (sometimes, not in a predicted pattern).
> >> > > > Maybe it's worth mentioning that I also removed the "UPGRADE_FROM"
> >> conf
> >> > > > that was forgotten there from the older versions.
> >> > > >
> >> > > > We are using Scala 2.12, and the line that throws this exception
> is
> >> > using
> >> > > > flatMapValues:
> >> > > >
> >> > > >
> >> > > > >  inputStream.flatMapValues(_.split) # return type
> >> > > > > KStream[Windowed[String], SingleInputMessage]
> >> > > >
> >> > > >
> >> > > > Where inputStream is of type: KStream[Windowed[String],
> >> InputMessage]
> >> > and
> >> > > > the split method splits this InputMessage into several
> >> > > > SingleInputMessage messages (hence the flat - to avoid
> >> > > > List[SingleInputMessage]).
> >> > > >
> >> > > > The exception:
> >> > > >
> >> > > > > java.lang.NullPointerException: null Wrapped by:
> >> > > > > org.apache.kafka.streams.errors.StreamsException: Exception
> >> caught in
> >> > > > > process. taskId=2_2,
> >> > processor=unique_input_message-repartition-source,
> >> > > > > topic=service-unique_input_message-repartition, partition=2,
> >> > > > > offset=318846738, stacktrace=java.lang.NullPointerException
> >> > > > >
> >> > > >
> >> > > > java.lang.NullPointerException: null at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> com.app.consumer.Utils$.$anonfun$buildCountersStream$1(ServiceUtils.scala:91)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.scala.FunctionsCompatConversions$FlatValueMapperFromFunction$.$anonfun$asValueMapper$2(FunctionsCompatConversions.scala:62)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.kstream.internals.AbstractStream.lambda$withKey$1(AbstractStream.java:105)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.kstream.internals.KStreamFlatMapValues$KStreamFlatMapValuesProcessor.process(KStreamFlatMapValues.java:40)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:44)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:109)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:78)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.kstream.internals.SessionCacheFlushListener.apply(SessionCacheFlushListener.java:43)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.kstream.internals.SessionCacheFlushListener.apply(SessionCacheFlushListener.java:26)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.state.internals.MeteredSessionStore.lambda$setFlushListener$1(MeteredSessionStore.java:97)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.state.internals.CachingSessionStore.putAndMaybeForward(CachingSessionStore.java:98)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.state.internals.CachingSessionStore.lambda$initInternal$0(CachingSessionStore.java:76)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:151)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:244)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:240)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:150)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.state.internals.CachingSessionStore.put(CachingSessionStore.java:134)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.state.internals.CachingSessionStore.remove(CachingSessionStore.java:142)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.state.internals.MeteredSessionStore.lambda$remove$3(MeteredSessionStore.java:134)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.state.internals.MeteredSessionStore.remove(MeteredSessionStore.java:131)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator$SessionStoreReadWriteDecorator.remove(AbstractReadWriteDecorator.java:221)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.kstream.internals.KStreamSessionWindowAggregate$KStreamSessionWindowAggregateProcessor.process(KStreamSessionWindowAggregate.java:171)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:85)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:678)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:678)
> >> > > > > ... 4 common frames omitted Wrapped by:
> >> > > > > org.apache.kafka.streams.errors.StreamsException: Exception
> >> caught in
> >> > > > > process. taskId=2_2,
> >> > processor=unique_input_message-repartition-source,
> >> > > > > topic=service-unique_input_message-repartition, partition=2,
> >> > > > > offset=318846738, stacktrace=java.lang.NullPointerException at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> com.app.consumer.ServiceUtils$.$anonfun$buildCountersStream$1(ServiceUtils.scala:91)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.scala.FunctionsCompatConversions$FlatValueMapperFromFunction$.$anonfun$asValueMapper$2(FunctionsCompatConversions.scala:62)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.kstream.internals.AbstractStream.lambda$withKey$1(AbstractStream.java:105)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.kstream.internals.KStreamFlatMapValues$KStreamFlatMapValuesProcessor.process(KStreamFlatMapValues.java:40)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:44)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:109)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:78)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.kstream.internals.SessionCacheFlushListener.apply(SessionCacheFlushListener.java:43)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.kstream.internals.SessionCacheFlushListener.apply(SessionCacheFlushListener.java:26)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.state.internals.MeteredSessionStore.lambda$setFlushListener$1(MeteredSessionStore.java:97)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.state.internals.CachingSessionStore.putAndMaybeForward(CachingSessionStore.java:98)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.state.internals.CachingSessionStore.lambda$initInternal$0(CachingSessionStore.java:76)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:151)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:244)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:240)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:150)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.state.internals.CachingSessionStore.put(CachingSessionStore.java:134)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.state.internals.CachingSessionStore.remove(CachingSessionStore.java:142)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.state.internals.MeteredSessionStore.lambda$remove$3(MeteredSessionStore.java:134)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.state.internals.MeteredSessionStore.remove(MeteredSessionStore.java:131)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator$SessionStoreReadWriteDecorator.remove(AbstractReadWriteDecorator.java:221)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.kstream.internals.KStreamSessionWindowAggregate$KStreamSessionWindowAggregateProcessor.process(KStreamSessionWindowAggregate.java:171)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:85)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:678)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:678)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1034)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:690)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:695)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1034)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:690)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510)
> >> > > > >
> >> > > >
> >> > > > * 2nd line of the exception is because we are using Scala
> >> > > > (FunctionsCompatConversions.scala:62)
> >> > > >
> >> > > > > implicit class FlatValueMapperFromFunction[V, VR](val f: V =>
> >> > > > > Iterable[VR]) extends AnyVal { def asValueMapper: ValueMapper[V,
> >> > > > > JIterable[VR]] = (value: V) => f(value).*asJava* }
> >> > > > >
> >> > > >
> >> > > > The main thing here is that we didn't change anything in the app
> >> code..
> >> > > so
> >> > > > i wonder if it's a new bug OR our implementation bug that somehow
> >> > didn't
> >> > > > happen in 2.5.1 (or previous versions, since this logic is old)
> >> > > >
> >> > > > Thanks and let me know what else can help (i wish i knew how to
> >> > > reproduce,
> >> > > > it happened 6 times during the last day and no idea why.. i'll try
> >> to
> >> > > catch
> >> > > > it with logs)
> >> > > >
> >> > > > --
> >> > > >
> >> > > > Nitay Kufert
> >> > > > Backend Team Leader
> >> > > > [image: ironSource] <http://www.ironsrc.com>
> >> > > >
> >> > > > email nitay.k@ironsrc.com
> >> > > > mobile +972-54-5480021
> >> > > > fax +972-77-5448273
> >> > > > skype nitay.kufert.ssa
> >> > > > 121 Menachem Begin St., Tel Aviv, Israel
> >> > > > ironsrc.com <http://www.ironsrc.com>
> >> > > > [image: linkedin] <https://www.linkedin.com/company/ironsource>
> >> > [image:
> >> > > > twitter] <https://twitter.com/ironsource> [image: facebook]
> >> > > > <https://www.facebook.com/ironSource> [image: googleplus]
> >> > > > <https://plus.google.com/+ironsrc>
> >> > > > This email (including any attachments) is for the sole use of the
> >> > > intended
> >> > > > recipient and may contain confidential information which may be
> >> > protected
> >> > > > by legal privilege. If you are not the intended recipient, or the
> >> > > employee
> >> > > > or agent responsible for delivering it to the intended recipient,
> >> you
> >> > are
> >> > > > hereby notified that any use, dissemination, distribution or
> >> copying of
> >> > > > this communication and/or its content is strictly prohibited. If
> you
> >> > are
> >> > > > not the intended recipient, please immediately notify us by reply
> >> email
> >> > > or
> >> > > > by telephone, delete this email and destroy any copies. Thank you.
> >> > > >
> >> > >
> >> > >
> >> > > --
> >> > > -- Guozhang
> >> > >
> >> >
> >> >
> >> > --
> >> >
> >> > Nitay Kufert
> >> > Backend Team Leader
> >> > [image: ironSource] <http://www.ironsrc.com>
> >> >
> >> > email nitay.k@ironsrc.com
> >> > mobile +972-54-5480021
> >> > fax +972-77-5448273
> >> > skype nitay.kufert.ssa
> >> > 121 Menachem Begin St., Tel Aviv, Israel
> >> > ironsrc.com <http://www.ironsrc.com>
> >> > [image: linkedin] <https://www.linkedin.com/company/ironsource>
> [image:
> >> > twitter] <https://twitter.com/ironsource> [image: facebook]
> >> > <https://www.facebook.com/ironSource> [image: googleplus]
> >> > <https://plus.google.com/+ironsrc>
> >> > This email (including any attachments) is for the sole use of the
> >> intended
> >> > recipient and may contain confidential information which may be
> >> protected
> >> > by legal privilege. If you are not the intended recipient, or the
> >> employee
> >> > or agent responsible for delivering it to the intended recipient, you
> >> are
> >> > hereby notified that any use, dissemination, distribution or copying
> of
> >> > this communication and/or its content is strictly prohibited. If you
> are
> >> > not the intended recipient, please immediately notify us by reply
> email
> >> or
> >> > by telephone, delete this email and destroy any copies. Thank you.
> >> >
> >>
> >>
> >> --
> >> -- Guozhang
> >>
> >
> >
> > --
> >
> > Nitay Kufert
> > Backend Team Leader
> > [image: ironSource] <http://www.ironsrc.com>
> >
> > email nitay.k@ironsrc.com
> > mobile +972-54-5480021
> > fax +972-77-5448273
> > skype nitay.kufert.ssa
> > 121 Menachem Begin St., Tel Aviv, Israel
> > ironsrc.com <http://www.ironsrc.com>
> > [image: linkedin] <https://www.linkedin.com/company/ironsource> [image:
> > twitter] <https://twitter.com/ironsource> [image: facebook]
> > <https://www.facebook.com/ironSource> [image: googleplus]
> > <https://plus.google.com/+ironsrc>
> > This email (including any attachments) is for the sole use of the
> intended
> > recipient and may contain confidential information which may be protected
> > by legal privilege. If you are not the intended recipient, or the
> employee
> > or agent responsible for delivering it to the intended recipient, you are
> > hereby notified that any use, dissemination, distribution or copying of
> > this communication and/or its content is strictly prohibited. If you are
> > not the intended recipient, please immediately notify us by reply email
> or
> > by telephone, delete this email and destroy any copies. Thank you.
> >
>


-- 
-- Guozhang

Re: NullPointerException after upgrading from 2.5.1 to 2.6.1 in my stream app

Posted by Nitay Kufert <ni...@is.com.INVALID>.
Bumping for the off chance that during this time some sort of a bug was
reported that might explain this behaviour..  i will feel more comfortable
bumping our kafka versions this way :)

On Wed, Feb 24, 2021 at 12:48 PM Nitay Kufert <ni...@ironsrc.com> wrote:

> I guess it's possible but very unlikely because it works perfectly with
> all the previous versions and the current one? (2.5.1)
> Why did a change in the version introduce NULLS there?
>
> On Tue, Feb 23, 2021 at 9:16 PM Guozhang Wang <wa...@gmail.com> wrote:
>
>> Is it possible that the flattened values contain `null` and hence
>> `_.split`
>> throws?
>>
>> On Tue, Feb 23, 2021 at 8:23 AM Nitay Kufert <ni...@ironsrc.com> wrote:
>>
>> > Hey, missed your replay - but the code i've shared above the logs is the
>> > code around those lines (removed some identifiers to make it a little
>> bit
>> > more generic):
>> >
>> > > inputStream.flatMapValues(_.split).peek((k, v) => {val _ = $k ->
>> > > ${v.printForDebug}")}) # return type KStream[Windowed[String],
>> > > SingleInputMessage]
>> >
>> >
>> > On Fri, Jan 29, 2021 at 9:01 AM Guozhang Wang <wa...@gmail.com>
>> wrote:
>> >
>> > > Could you share your code around
>> > >
>> > > >
>> > >
>> > >
>> >
>> com.app.consumer.Utils$.$anonfun$buildCountersStream$1(ServiceUtils.scala:91)
>> > >
>> > > That seems to be where NPE is thrown.
>> > >
>> > >
>> > > On Wed, Jan 13, 2021 at 5:46 AM Nitay Kufert <ni...@ironsrc.com>
>> > wrote:
>> > >
>> > > > Hey,
>> > > > *Without any code change*, just by bumping the kafka version from
>> 2.5.1
>> > > to
>> > > > 2.6.1 (clients only) - my stream application started throwing
>> > > > NullPointerException (sometimes, not in a predicted pattern).
>> > > > Maybe it's worth mentioning that I also removed the "UPGRADE_FROM"
>> conf
>> > > > that was forgotten there from the older versions.
>> > > >
>> > > > We are using Scala 2.12, and the line that throws this exception is
>> > using
>> > > > flatMapValues:
>> > > >
>> > > >
>> > > > >  inputStream.flatMapValues(_.split) # return type
>> > > > > KStream[Windowed[String], SingleInputMessage]
>> > > >
>> > > >
>> > > > Where inputStream is of type: KStream[Windowed[String],
>> InputMessage]
>> > and
>> > > > the split method splits this InputMessage into several
>> > > > SingleInputMessage messages (hence the flat - to avoid
>> > > > List[SingleInputMessage]).
>> > > >
>> > > > The exception:
>> > > >
>> > > > > java.lang.NullPointerException: null Wrapped by:
>> > > > > org.apache.kafka.streams.errors.StreamsException: Exception
>> caught in
>> > > > > process. taskId=2_2,
>> > processor=unique_input_message-repartition-source,
>> > > > > topic=service-unique_input_message-repartition, partition=2,
>> > > > > offset=318846738, stacktrace=java.lang.NullPointerException
>> > > > >
>> > > >
>> > > > java.lang.NullPointerException: null at
>> > > > >
>> > > >
>> > >
>> >
>> com.app.consumer.Utils$.$anonfun$buildCountersStream$1(ServiceUtils.scala:91)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.scala.FunctionsCompatConversions$FlatValueMapperFromFunction$.$anonfun$asValueMapper$2(FunctionsCompatConversions.scala:62)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.kstream.internals.AbstractStream.lambda$withKey$1(AbstractStream.java:105)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.kstream.internals.KStreamFlatMapValues$KStreamFlatMapValuesProcessor.process(KStreamFlatMapValues.java:40)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:44)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:109)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:78)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.kstream.internals.SessionCacheFlushListener.apply(SessionCacheFlushListener.java:43)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.kstream.internals.SessionCacheFlushListener.apply(SessionCacheFlushListener.java:26)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.state.internals.MeteredSessionStore.lambda$setFlushListener$1(MeteredSessionStore.java:97)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.state.internals.CachingSessionStore.putAndMaybeForward(CachingSessionStore.java:98)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.state.internals.CachingSessionStore.lambda$initInternal$0(CachingSessionStore.java:76)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:151)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:244)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:240)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:150)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.state.internals.CachingSessionStore.put(CachingSessionStore.java:134)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.state.internals.CachingSessionStore.remove(CachingSessionStore.java:142)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.state.internals.MeteredSessionStore.lambda$remove$3(MeteredSessionStore.java:134)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.state.internals.MeteredSessionStore.remove(MeteredSessionStore.java:131)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator$SessionStoreReadWriteDecorator.remove(AbstractReadWriteDecorator.java:221)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.kstream.internals.KStreamSessionWindowAggregate$KStreamSessionWindowAggregateProcessor.process(KStreamSessionWindowAggregate.java:171)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:85)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:678)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:678)
>> > > > > ... 4 common frames omitted Wrapped by:
>> > > > > org.apache.kafka.streams.errors.StreamsException: Exception
>> caught in
>> > > > > process. taskId=2_2,
>> > processor=unique_input_message-repartition-source,
>> > > > > topic=service-unique_input_message-repartition, partition=2,
>> > > > > offset=318846738, stacktrace=java.lang.NullPointerException at
>> > > > >
>> > > >
>> > >
>> >
>> com.app.consumer.ServiceUtils$.$anonfun$buildCountersStream$1(ServiceUtils.scala:91)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.scala.FunctionsCompatConversions$FlatValueMapperFromFunction$.$anonfun$asValueMapper$2(FunctionsCompatConversions.scala:62)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.kstream.internals.AbstractStream.lambda$withKey$1(AbstractStream.java:105)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.kstream.internals.KStreamFlatMapValues$KStreamFlatMapValuesProcessor.process(KStreamFlatMapValues.java:40)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:44)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:109)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:78)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.kstream.internals.SessionCacheFlushListener.apply(SessionCacheFlushListener.java:43)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.kstream.internals.SessionCacheFlushListener.apply(SessionCacheFlushListener.java:26)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.state.internals.MeteredSessionStore.lambda$setFlushListener$1(MeteredSessionStore.java:97)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.state.internals.CachingSessionStore.putAndMaybeForward(CachingSessionStore.java:98)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.state.internals.CachingSessionStore.lambda$initInternal$0(CachingSessionStore.java:76)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:151)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:244)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:240)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:150)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.state.internals.CachingSessionStore.put(CachingSessionStore.java:134)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.state.internals.CachingSessionStore.remove(CachingSessionStore.java:142)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.state.internals.MeteredSessionStore.lambda$remove$3(MeteredSessionStore.java:134)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.state.internals.MeteredSessionStore.remove(MeteredSessionStore.java:131)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator$SessionStoreReadWriteDecorator.remove(AbstractReadWriteDecorator.java:221)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.kstream.internals.KStreamSessionWindowAggregate$KStreamSessionWindowAggregateProcessor.process(KStreamSessionWindowAggregate.java:171)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:85)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:678)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:678)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1034)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:690)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:695)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1034)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:690)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510)
>> > > > >
>> > > >
>> > > > * 2nd line of the exception is because we are using Scala
>> > > > (FunctionsCompatConversions.scala:62)
>> > > >
>> > > > > implicit class FlatValueMapperFromFunction[V, VR](val f: V =>
>> > > > > Iterable[VR]) extends AnyVal { def asValueMapper: ValueMapper[V,
>> > > > > JIterable[VR]] = (value: V) => f(value).*asJava* }
>> > > > >
>> > > >
>> > > > The main thing here is that we didn't change anything in the app
>> code..
>> > > so
>> > > > i wonder if it's a new bug OR our implementation bug that somehow
>> > didn't
>> > > > happen in 2.5.1 (or previous versions, since this logic is old)
>> > > >
>> > > > Thanks and let me know what else can help (i wish i knew how to
>> > > reproduce,
>> > > > it happened 6 times during the last day and no idea why.. i'll try
>> to
>> > > catch
>> > > > it with logs)
>> > > >
>> > > > --
>> > > >
>> > > > Nitay Kufert
>> > > > Backend Team Leader
>> > > > [image: ironSource] <http://www.ironsrc.com>
>> > > >
>> > > > email nitay.k@ironsrc.com
>> > > > mobile +972-54-5480021
>> > > > fax +972-77-5448273
>> > > > skype nitay.kufert.ssa
>> > > > 121 Menachem Begin St., Tel Aviv, Israel
>> > > > ironsrc.com <http://www.ironsrc.com>
>> > > > [image: linkedin] <https://www.linkedin.com/company/ironsource>
>> > [image:
>> > > > twitter] <https://twitter.com/ironsource> [image: facebook]
>> > > > <https://www.facebook.com/ironSource> [image: googleplus]
>> > > > <https://plus.google.com/+ironsrc>
>> > > > This email (including any attachments) is for the sole use of the
>> > > intended
>> > > > recipient and may contain confidential information which may be
>> > protected
>> > > > by legal privilege. If you are not the intended recipient, or the
>> > > employee
>> > > > or agent responsible for delivering it to the intended recipient,
>> you
>> > are
>> > > > hereby notified that any use, dissemination, distribution or
>> copying of
>> > > > this communication and/or its content is strictly prohibited. If you
>> > are
>> > > > not the intended recipient, please immediately notify us by reply
>> email
>> > > or
>> > > > by telephone, delete this email and destroy any copies. Thank you.
>> > > >
>> > >
>> > >
>> > > --
>> > > -- Guozhang
>> > >
>> >
>> >
>> > --
>> >
>> > Nitay Kufert
>> > Backend Team Leader
>> > [image: ironSource] <http://www.ironsrc.com>
>> >
>> > email nitay.k@ironsrc.com
>> > mobile +972-54-5480021
>> > fax +972-77-5448273
>> > skype nitay.kufert.ssa
>> > 121 Menachem Begin St., Tel Aviv, Israel
>> > ironsrc.com <http://www.ironsrc.com>
>> > [image: linkedin] <https://www.linkedin.com/company/ironsource> [image:
>> > twitter] <https://twitter.com/ironsource> [image: facebook]
>> > <https://www.facebook.com/ironSource> [image: googleplus]
>> > <https://plus.google.com/+ironsrc>
>> > This email (including any attachments) is for the sole use of the
>> intended
>> > recipient and may contain confidential information which may be
>> protected
>> > by legal privilege. If you are not the intended recipient, or the
>> employee
>> > or agent responsible for delivering it to the intended recipient, you
>> are
>> > hereby notified that any use, dissemination, distribution or copying of
>> > this communication and/or its content is strictly prohibited. If you are
>> > not the intended recipient, please immediately notify us by reply email
>> or
>> > by telephone, delete this email and destroy any copies. Thank you.
>> >
>>
>>
>> --
>> -- Guozhang
>>
>
>
> --
>
> Nitay Kufert
> Backend Team Leader
> [image: ironSource] <http://www.ironsrc.com>
>
> email nitay.k@ironsrc.com
> mobile +972-54-5480021
> fax +972-77-5448273
> skype nitay.kufert.ssa
> 121 Menachem Begin St., Tel Aviv, Israel
> ironsrc.com <http://www.ironsrc.com>
> [image: linkedin] <https://www.linkedin.com/company/ironsource> [image:
> twitter] <https://twitter.com/ironsource> [image: facebook]
> <https://www.facebook.com/ironSource> [image: googleplus]
> <https://plus.google.com/+ironsrc>
> This email (including any attachments) is for the sole use of the intended
> recipient and may contain confidential information which may be protected
> by legal privilege. If you are not the intended recipient, or the employee
> or agent responsible for delivering it to the intended recipient, you are
> hereby notified that any use, dissemination, distribution or copying of
> this communication and/or its content is strictly prohibited. If you are
> not the intended recipient, please immediately notify us by reply email or
> by telephone, delete this email and destroy any copies. Thank you.
>

Re: NullPointerException after upgrading from 2.5.1 to 2.6.1 in my stream app

Posted by Nitay Kufert <ni...@ironsrc.com>.
I guess it's possible but very unlikely because it works perfectly with all
the previous versions and the current one? (2.5.1)
Why did a change in the version introduce NULLS there?

On Tue, Feb 23, 2021 at 9:16 PM Guozhang Wang <wa...@gmail.com> wrote:

> Is it possible that the flattened values contain `null` and hence `_.split`
> throws?
>
> On Tue, Feb 23, 2021 at 8:23 AM Nitay Kufert <ni...@ironsrc.com> wrote:
>
> > Hey, missed your replay - but the code i've shared above the logs is the
> > code around those lines (removed some identifiers to make it a little bit
> > more generic):
> >
> > > inputStream.flatMapValues(_.split).peek((k, v) => {val _ = $k ->
> > > ${v.printForDebug}")}) # return type KStream[Windowed[String],
> > > SingleInputMessage]
> >
> >
> > On Fri, Jan 29, 2021 at 9:01 AM Guozhang Wang <wa...@gmail.com>
> wrote:
> >
> > > Could you share your code around
> > >
> > > >
> > >
> > >
> >
> com.app.consumer.Utils$.$anonfun$buildCountersStream$1(ServiceUtils.scala:91)
> > >
> > > That seems to be where NPE is thrown.
> > >
> > >
> > > On Wed, Jan 13, 2021 at 5:46 AM Nitay Kufert <ni...@ironsrc.com>
> > wrote:
> > >
> > > > Hey,
> > > > *Without any code change*, just by bumping the kafka version from
> 2.5.1
> > > to
> > > > 2.6.1 (clients only) - my stream application started throwing
> > > > NullPointerException (sometimes, not in a predicted pattern).
> > > > Maybe it's worth mentioning that I also removed the "UPGRADE_FROM"
> conf
> > > > that was forgotten there from the older versions.
> > > >
> > > > We are using Scala 2.12, and the line that throws this exception is
> > using
> > > > flatMapValues:
> > > >
> > > >
> > > > >  inputStream.flatMapValues(_.split) # return type
> > > > > KStream[Windowed[String], SingleInputMessage]
> > > >
> > > >
> > > > Where inputStream is of type: KStream[Windowed[String], InputMessage]
> > and
> > > > the split method splits this InputMessage into several
> > > > SingleInputMessage messages (hence the flat - to avoid
> > > > List[SingleInputMessage]).
> > > >
> > > > The exception:
> > > >
> > > > > java.lang.NullPointerException: null Wrapped by:
> > > > > org.apache.kafka.streams.errors.StreamsException: Exception caught
> in
> > > > > process. taskId=2_2,
> > processor=unique_input_message-repartition-source,
> > > > > topic=service-unique_input_message-repartition, partition=2,
> > > > > offset=318846738, stacktrace=java.lang.NullPointerException
> > > > >
> > > >
> > > > java.lang.NullPointerException: null at
> > > > >
> > > >
> > >
> >
> com.app.consumer.Utils$.$anonfun$buildCountersStream$1(ServiceUtils.scala:91)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.scala.FunctionsCompatConversions$FlatValueMapperFromFunction$.$anonfun$asValueMapper$2(FunctionsCompatConversions.scala:62)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.kstream.internals.AbstractStream.lambda$withKey$1(AbstractStream.java:105)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.kstream.internals.KStreamFlatMapValues$KStreamFlatMapValuesProcessor.process(KStreamFlatMapValues.java:40)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:44)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:109)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:78)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.kstream.internals.SessionCacheFlushListener.apply(SessionCacheFlushListener.java:43)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.kstream.internals.SessionCacheFlushListener.apply(SessionCacheFlushListener.java:26)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.state.internals.MeteredSessionStore.lambda$setFlushListener$1(MeteredSessionStore.java:97)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.state.internals.CachingSessionStore.putAndMaybeForward(CachingSessionStore.java:98)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.state.internals.CachingSessionStore.lambda$initInternal$0(CachingSessionStore.java:76)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:151)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:244)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:240)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:150)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.state.internals.CachingSessionStore.put(CachingSessionStore.java:134)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.state.internals.CachingSessionStore.remove(CachingSessionStore.java:142)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.state.internals.MeteredSessionStore.lambda$remove$3(MeteredSessionStore.java:134)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.state.internals.MeteredSessionStore.remove(MeteredSessionStore.java:131)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator$SessionStoreReadWriteDecorator.remove(AbstractReadWriteDecorator.java:221)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.kstream.internals.KStreamSessionWindowAggregate$KStreamSessionWindowAggregateProcessor.process(KStreamSessionWindowAggregate.java:171)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:85)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:678)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:678)
> > > > > ... 4 common frames omitted Wrapped by:
> > > > > org.apache.kafka.streams.errors.StreamsException: Exception caught
> in
> > > > > process. taskId=2_2,
> > processor=unique_input_message-repartition-source,
> > > > > topic=service-unique_input_message-repartition, partition=2,
> > > > > offset=318846738, stacktrace=java.lang.NullPointerException at
> > > > >
> > > >
> > >
> >
> com.app.consumer.ServiceUtils$.$anonfun$buildCountersStream$1(ServiceUtils.scala:91)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.scala.FunctionsCompatConversions$FlatValueMapperFromFunction$.$anonfun$asValueMapper$2(FunctionsCompatConversions.scala:62)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.kstream.internals.AbstractStream.lambda$withKey$1(AbstractStream.java:105)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.kstream.internals.KStreamFlatMapValues$KStreamFlatMapValuesProcessor.process(KStreamFlatMapValues.java:40)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:44)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:109)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:78)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.kstream.internals.SessionCacheFlushListener.apply(SessionCacheFlushListener.java:43)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.kstream.internals.SessionCacheFlushListener.apply(SessionCacheFlushListener.java:26)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.state.internals.MeteredSessionStore.lambda$setFlushListener$1(MeteredSessionStore.java:97)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.state.internals.CachingSessionStore.putAndMaybeForward(CachingSessionStore.java:98)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.state.internals.CachingSessionStore.lambda$initInternal$0(CachingSessionStore.java:76)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:151)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:244)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:240)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:150)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.state.internals.CachingSessionStore.put(CachingSessionStore.java:134)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.state.internals.CachingSessionStore.remove(CachingSessionStore.java:142)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.state.internals.MeteredSessionStore.lambda$remove$3(MeteredSessionStore.java:134)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.state.internals.MeteredSessionStore.remove(MeteredSessionStore.java:131)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator$SessionStoreReadWriteDecorator.remove(AbstractReadWriteDecorator.java:221)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.kstream.internals.KStreamSessionWindowAggregate$KStreamSessionWindowAggregateProcessor.process(KStreamSessionWindowAggregate.java:171)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:85)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:678)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:678)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1034)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:690)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:695)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1034)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:690)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510)
> > > > >
> > > >
> > > > * 2nd line of the exception is because we are using Scala
> > > > (FunctionsCompatConversions.scala:62)
> > > >
> > > > > implicit class FlatValueMapperFromFunction[V, VR](val f: V =>
> > > > > Iterable[VR]) extends AnyVal { def asValueMapper: ValueMapper[V,
> > > > > JIterable[VR]] = (value: V) => f(value).*asJava* }
> > > > >
> > > >
> > > > The main thing here is that we didn't change anything in the app
> code..
> > > so
> > > > i wonder if it's a new bug OR our implementation bug that somehow
> > didn't
> > > > happen in 2.5.1 (or previous versions, since this logic is old)
> > > >
> > > > Thanks and let me know what else can help (i wish i knew how to
> > > reproduce,
> > > > it happened 6 times during the last day and no idea why.. i'll try to
> > > catch
> > > > it with logs)
> > > >
> > > > --
> > > >
> > > > Nitay Kufert
> > > > Backend Team Leader
> > > > [image: ironSource] <http://www.ironsrc.com>
> > > >
> > > > email nitay.k@ironsrc.com
> > > > mobile +972-54-5480021
> > > > fax +972-77-5448273
> > > > skype nitay.kufert.ssa
> > > > 121 Menachem Begin St., Tel Aviv, Israel
> > > > ironsrc.com <http://www.ironsrc.com>
> > > > [image: linkedin] <https://www.linkedin.com/company/ironsource>
> > [image:
> > > > twitter] <https://twitter.com/ironsource> [image: facebook]
> > > > <https://www.facebook.com/ironSource> [image: googleplus]
> > > > <https://plus.google.com/+ironsrc>
> > > > This email (including any attachments) is for the sole use of the
> > > intended
> > > > recipient and may contain confidential information which may be
> > protected
> > > > by legal privilege. If you are not the intended recipient, or the
> > > employee
> > > > or agent responsible for delivering it to the intended recipient, you
> > are
> > > > hereby notified that any use, dissemination, distribution or copying
> of
> > > > this communication and/or its content is strictly prohibited. If you
> > are
> > > > not the intended recipient, please immediately notify us by reply
> email
> > > or
> > > > by telephone, delete this email and destroy any copies. Thank you.
> > > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
> >
> > --
> >
> > Nitay Kufert
> > Backend Team Leader
> > [image: ironSource] <http://www.ironsrc.com>
> >
> > email nitay.k@ironsrc.com
> > mobile +972-54-5480021
> > fax +972-77-5448273
> > skype nitay.kufert.ssa
> > 121 Menachem Begin St., Tel Aviv, Israel
> > ironsrc.com <http://www.ironsrc.com>
> > [image: linkedin] <https://www.linkedin.com/company/ironsource> [image:
> > twitter] <https://twitter.com/ironsource> [image: facebook]
> > <https://www.facebook.com/ironSource> [image: googleplus]
> > <https://plus.google.com/+ironsrc>
> > This email (including any attachments) is for the sole use of the
> intended
> > recipient and may contain confidential information which may be protected
> > by legal privilege. If you are not the intended recipient, or the
> employee
> > or agent responsible for delivering it to the intended recipient, you are
> > hereby notified that any use, dissemination, distribution or copying of
> > this communication and/or its content is strictly prohibited. If you are
> > not the intended recipient, please immediately notify us by reply email
> or
> > by telephone, delete this email and destroy any copies. Thank you.
> >
>
>
> --
> -- Guozhang
>


-- 

Nitay Kufert
Backend Team Leader
[image: ironSource] <http://www.ironsrc.com>

email nitay.k@ironsrc.com
mobile +972-54-5480021
fax +972-77-5448273
skype nitay.kufert.ssa
121 Menachem Begin St., Tel Aviv, Israel
ironsrc.com <http://www.ironsrc.com>
[image: linkedin] <https://www.linkedin.com/company/ironsource> [image:
twitter] <https://twitter.com/ironsource> [image: facebook]
<https://www.facebook.com/ironSource> [image: googleplus]
<https://plus.google.com/+ironsrc>
This email (including any attachments) is for the sole use of the intended
recipient and may contain confidential information which may be protected
by legal privilege. If you are not the intended recipient, or the employee
or agent responsible for delivering it to the intended recipient, you are
hereby notified that any use, dissemination, distribution or copying of
this communication and/or its content is strictly prohibited. If you are
not the intended recipient, please immediately notify us by reply email or
by telephone, delete this email and destroy any copies. Thank you.

Re: NullPointerException after upgrading from 2.5.1 to 2.6.1 in my stream app

Posted by Guozhang Wang <wa...@gmail.com>.
Is it possible that the flattened values contain `null` and hence `_.split`
throws?

On Tue, Feb 23, 2021 at 8:23 AM Nitay Kufert <ni...@ironsrc.com> wrote:

> Hey, missed your replay - but the code i've shared above the logs is the
> code around those lines (removed some identifiers to make it a little bit
> more generic):
>
> > inputStream.flatMapValues(_.split).peek((k, v) => {val _ = $k ->
> > ${v.printForDebug}")}) # return type KStream[Windowed[String],
> > SingleInputMessage]
>
>
> On Fri, Jan 29, 2021 at 9:01 AM Guozhang Wang <wa...@gmail.com> wrote:
>
> > Could you share your code around
> >
> > >
> >
> >
> com.app.consumer.Utils$.$anonfun$buildCountersStream$1(ServiceUtils.scala:91)
> >
> > That seems to be where NPE is thrown.
> >
> >
> > On Wed, Jan 13, 2021 at 5:46 AM Nitay Kufert <ni...@ironsrc.com>
> wrote:
> >
> > > Hey,
> > > *Without any code change*, just by bumping the kafka version from 2.5.1
> > to
> > > 2.6.1 (clients only) - my stream application started throwing
> > > NullPointerException (sometimes, not in a predicted pattern).
> > > Maybe it's worth mentioning that I also removed the "UPGRADE_FROM" conf
> > > that was forgotten there from the older versions.
> > >
> > > We are using Scala 2.12, and the line that throws this exception is
> using
> > > flatMapValues:
> > >
> > >
> > > >  inputStream.flatMapValues(_.split) # return type
> > > > KStream[Windowed[String], SingleInputMessage]
> > >
> > >
> > > Where inputStream is of type: KStream[Windowed[String], InputMessage]
> and
> > > the split method splits this InputMessage into several
> > > SingleInputMessage messages (hence the flat - to avoid
> > > List[SingleInputMessage]).
> > >
> > > The exception:
> > >
> > > > java.lang.NullPointerException: null Wrapped by:
> > > > org.apache.kafka.streams.errors.StreamsException: Exception caught in
> > > > process. taskId=2_2,
> processor=unique_input_message-repartition-source,
> > > > topic=service-unique_input_message-repartition, partition=2,
> > > > offset=318846738, stacktrace=java.lang.NullPointerException
> > > >
> > >
> > > java.lang.NullPointerException: null at
> > > >
> > >
> >
> com.app.consumer.Utils$.$anonfun$buildCountersStream$1(ServiceUtils.scala:91)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.scala.FunctionsCompatConversions$FlatValueMapperFromFunction$.$anonfun$asValueMapper$2(FunctionsCompatConversions.scala:62)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.kstream.internals.AbstractStream.lambda$withKey$1(AbstractStream.java:105)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.kstream.internals.KStreamFlatMapValues$KStreamFlatMapValuesProcessor.process(KStreamFlatMapValues.java:40)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:44)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:109)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:78)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.kstream.internals.SessionCacheFlushListener.apply(SessionCacheFlushListener.java:43)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.kstream.internals.SessionCacheFlushListener.apply(SessionCacheFlushListener.java:26)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.state.internals.MeteredSessionStore.lambda$setFlushListener$1(MeteredSessionStore.java:97)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.state.internals.CachingSessionStore.putAndMaybeForward(CachingSessionStore.java:98)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.state.internals.CachingSessionStore.lambda$initInternal$0(CachingSessionStore.java:76)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:151)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:244)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:240)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:150)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.state.internals.CachingSessionStore.put(CachingSessionStore.java:134)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.state.internals.CachingSessionStore.remove(CachingSessionStore.java:142)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.state.internals.MeteredSessionStore.lambda$remove$3(MeteredSessionStore.java:134)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.state.internals.MeteredSessionStore.remove(MeteredSessionStore.java:131)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator$SessionStoreReadWriteDecorator.remove(AbstractReadWriteDecorator.java:221)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.kstream.internals.KStreamSessionWindowAggregate$KStreamSessionWindowAggregateProcessor.process(KStreamSessionWindowAggregate.java:171)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:85)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:678)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:678)
> > > > ... 4 common frames omitted Wrapped by:
> > > > org.apache.kafka.streams.errors.StreamsException: Exception caught in
> > > > process. taskId=2_2,
> processor=unique_input_message-repartition-source,
> > > > topic=service-unique_input_message-repartition, partition=2,
> > > > offset=318846738, stacktrace=java.lang.NullPointerException at
> > > >
> > >
> >
> com.app.consumer.ServiceUtils$.$anonfun$buildCountersStream$1(ServiceUtils.scala:91)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.scala.FunctionsCompatConversions$FlatValueMapperFromFunction$.$anonfun$asValueMapper$2(FunctionsCompatConversions.scala:62)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.kstream.internals.AbstractStream.lambda$withKey$1(AbstractStream.java:105)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.kstream.internals.KStreamFlatMapValues$KStreamFlatMapValuesProcessor.process(KStreamFlatMapValues.java:40)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:44)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:109)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:78)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.kstream.internals.SessionCacheFlushListener.apply(SessionCacheFlushListener.java:43)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.kstream.internals.SessionCacheFlushListener.apply(SessionCacheFlushListener.java:26)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.state.internals.MeteredSessionStore.lambda$setFlushListener$1(MeteredSessionStore.java:97)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.state.internals.CachingSessionStore.putAndMaybeForward(CachingSessionStore.java:98)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.state.internals.CachingSessionStore.lambda$initInternal$0(CachingSessionStore.java:76)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:151)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:244)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:240)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:150)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.state.internals.CachingSessionStore.put(CachingSessionStore.java:134)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.state.internals.CachingSessionStore.remove(CachingSessionStore.java:142)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.state.internals.MeteredSessionStore.lambda$remove$3(MeteredSessionStore.java:134)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.state.internals.MeteredSessionStore.remove(MeteredSessionStore.java:131)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator$SessionStoreReadWriteDecorator.remove(AbstractReadWriteDecorator.java:221)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.kstream.internals.KStreamSessionWindowAggregate$KStreamSessionWindowAggregateProcessor.process(KStreamSessionWindowAggregate.java:171)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:85)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:678)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:678)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1034)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:690)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:695)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1034)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:690)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551)
> > > > at
> > > >
> > >
> >
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510)
> > > >
> > >
> > > * 2nd line of the exception is because we are using Scala
> > > (FunctionsCompatConversions.scala:62)
> > >
> > > > implicit class FlatValueMapperFromFunction[V, VR](val f: V =>
> > > > Iterable[VR]) extends AnyVal { def asValueMapper: ValueMapper[V,
> > > > JIterable[VR]] = (value: V) => f(value).*asJava* }
> > > >
> > >
> > > The main thing here is that we didn't change anything in the app code..
> > so
> > > i wonder if it's a new bug OR our implementation bug that somehow
> didn't
> > > happen in 2.5.1 (or previous versions, since this logic is old)
> > >
> > > Thanks and let me know what else can help (i wish i knew how to
> > reproduce,
> > > it happened 6 times during the last day and no idea why.. i'll try to
> > catch
> > > it with logs)
> > >
> > > --
> > >
> > > Nitay Kufert
> > > Backend Team Leader
> > > [image: ironSource] <http://www.ironsrc.com>
> > >
> > > email nitay.k@ironsrc.com
> > > mobile +972-54-5480021
> > > fax +972-77-5448273
> > > skype nitay.kufert.ssa
> > > 121 Menachem Begin St., Tel Aviv, Israel
> > > ironsrc.com <http://www.ironsrc.com>
> > > [image: linkedin] <https://www.linkedin.com/company/ironsource>
> [image:
> > > twitter] <https://twitter.com/ironsource> [image: facebook]
> > > <https://www.facebook.com/ironSource> [image: googleplus]
> > > <https://plus.google.com/+ironsrc>
> > > This email (including any attachments) is for the sole use of the
> > intended
> > > recipient and may contain confidential information which may be
> protected
> > > by legal privilege. If you are not the intended recipient, or the
> > employee
> > > or agent responsible for delivering it to the intended recipient, you
> are
> > > hereby notified that any use, dissemination, distribution or copying of
> > > this communication and/or its content is strictly prohibited. If you
> are
> > > not the intended recipient, please immediately notify us by reply email
> > or
> > > by telephone, delete this email and destroy any copies. Thank you.
> > >
> >
> >
> > --
> > -- Guozhang
> >
>
>
> --
>
> Nitay Kufert
> Backend Team Leader
> [image: ironSource] <http://www.ironsrc.com>
>
> email nitay.k@ironsrc.com
> mobile +972-54-5480021
> fax +972-77-5448273
> skype nitay.kufert.ssa
> 121 Menachem Begin St., Tel Aviv, Israel
> ironsrc.com <http://www.ironsrc.com>
> [image: linkedin] <https://www.linkedin.com/company/ironsource> [image:
> twitter] <https://twitter.com/ironsource> [image: facebook]
> <https://www.facebook.com/ironSource> [image: googleplus]
> <https://plus.google.com/+ironsrc>
> This email (including any attachments) is for the sole use of the intended
> recipient and may contain confidential information which may be protected
> by legal privilege. If you are not the intended recipient, or the employee
> or agent responsible for delivering it to the intended recipient, you are
> hereby notified that any use, dissemination, distribution or copying of
> this communication and/or its content is strictly prohibited. If you are
> not the intended recipient, please immediately notify us by reply email or
> by telephone, delete this email and destroy any copies. Thank you.
>


-- 
-- Guozhang

Re: NullPointerException after upgrading from 2.5.1 to 2.6.1 in my stream app

Posted by Nitay Kufert <ni...@ironsrc.com>.
Hey, missed your replay - but the code i've shared above the logs is the
code around those lines (removed some identifiers to make it a little bit
more generic):

> inputStream.flatMapValues(_.split).peek((k, v) => {val _ = $k ->
> ${v.printForDebug}")}) # return type KStream[Windowed[String],
> SingleInputMessage]


On Fri, Jan 29, 2021 at 9:01 AM Guozhang Wang <wa...@gmail.com> wrote:

> Could you share your code around
>
> >
>
> com.app.consumer.Utils$.$anonfun$buildCountersStream$1(ServiceUtils.scala:91)
>
> That seems to be where NPE is thrown.
>
>
> On Wed, Jan 13, 2021 at 5:46 AM Nitay Kufert <ni...@ironsrc.com> wrote:
>
> > Hey,
> > *Without any code change*, just by bumping the kafka version from 2.5.1
> to
> > 2.6.1 (clients only) - my stream application started throwing
> > NullPointerException (sometimes, not in a predicted pattern).
> > Maybe it's worth mentioning that I also removed the "UPGRADE_FROM" conf
> > that was forgotten there from the older versions.
> >
> > We are using Scala 2.12, and the line that throws this exception is using
> > flatMapValues:
> >
> >
> > >  inputStream.flatMapValues(_.split) # return type
> > > KStream[Windowed[String], SingleInputMessage]
> >
> >
> > Where inputStream is of type: KStream[Windowed[String], InputMessage] and
> > the split method splits this InputMessage into several
> > SingleInputMessage messages (hence the flat - to avoid
> > List[SingleInputMessage]).
> >
> > The exception:
> >
> > > java.lang.NullPointerException: null Wrapped by:
> > > org.apache.kafka.streams.errors.StreamsException: Exception caught in
> > > process. taskId=2_2, processor=unique_input_message-repartition-source,
> > > topic=service-unique_input_message-repartition, partition=2,
> > > offset=318846738, stacktrace=java.lang.NullPointerException
> > >
> >
> > java.lang.NullPointerException: null at
> > >
> >
> com.app.consumer.Utils$.$anonfun$buildCountersStream$1(ServiceUtils.scala:91)
> > > at
> > >
> >
> org.apache.kafka.streams.scala.FunctionsCompatConversions$FlatValueMapperFromFunction$.$anonfun$asValueMapper$2(FunctionsCompatConversions.scala:62)
> > > at
> > >
> >
> org.apache.kafka.streams.kstream.internals.AbstractStream.lambda$withKey$1(AbstractStream.java:105)
> > > at
> > >
> >
> org.apache.kafka.streams.kstream.internals.KStreamFlatMapValues$KStreamFlatMapValuesProcessor.process(KStreamFlatMapValues.java:40)
> > > at
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
> > > at
> > >
> >
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> > > at
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
> > > at
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
> > > at
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
> > > at
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
> > > at
> > >
> >
> org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:44)
> > > at
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
> > > at
> > >
> >
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> > > at
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
> > > at
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
> > > at
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
> > > at
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
> > > at
> > >
> >
> org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
> > > at
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
> > > at
> > >
> >
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> > > at
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
> > > at
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
> > > at
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
> > > at
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
> > > at
> > >
> >
> org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:109)
> > > at
> > >
> >
> org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:78)
> > > at
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
> > > at
> > >
> >
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> > > at
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
> > > at
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
> > > at
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
> > > at
> > >
> >
> org.apache.kafka.streams.kstream.internals.SessionCacheFlushListener.apply(SessionCacheFlushListener.java:43)
> > > at
> > >
> >
> org.apache.kafka.streams.kstream.internals.SessionCacheFlushListener.apply(SessionCacheFlushListener.java:26)
> > > at
> > >
> >
> org.apache.kafka.streams.state.internals.MeteredSessionStore.lambda$setFlushListener$1(MeteredSessionStore.java:97)
> > > at
> > >
> >
> org.apache.kafka.streams.state.internals.CachingSessionStore.putAndMaybeForward(CachingSessionStore.java:98)
> > > at
> > >
> >
> org.apache.kafka.streams.state.internals.CachingSessionStore.lambda$initInternal$0(CachingSessionStore.java:76)
> > > at
> > >
> >
> org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:151)
> > > at
> > >
> >
> org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:244)
> > > at
> > >
> >
> org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:240)
> > > at
> > >
> >
> org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:150)
> > > at
> > >
> >
> org.apache.kafka.streams.state.internals.CachingSessionStore.put(CachingSessionStore.java:134)
> > > at
> > >
> >
> org.apache.kafka.streams.state.internals.CachingSessionStore.remove(CachingSessionStore.java:142)
> > > at
> > >
> >
> org.apache.kafka.streams.state.internals.MeteredSessionStore.lambda$remove$3(MeteredSessionStore.java:134)
> > > at
> > >
> >
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> > > at
> > >
> >
> org.apache.kafka.streams.state.internals.MeteredSessionStore.remove(MeteredSessionStore.java:131)
> > > at
> > >
> >
> org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator$SessionStoreReadWriteDecorator.remove(AbstractReadWriteDecorator.java:221)
> > > at
> > >
> >
> org.apache.kafka.streams.kstream.internals.KStreamSessionWindowAggregate$KStreamSessionWindowAggregateProcessor.process(KStreamSessionWindowAggregate.java:171)
> > > at
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
> > > at
> > >
> >
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> > > at
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
> > > at
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
> > > at
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
> > > at
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
> > > at
> > >
> >
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:85)
> > > at
> > >
> >
> org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:678)
> > > at
> > >
> >
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> > > at
> > >
> >
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:678)
> > > ... 4 common frames omitted Wrapped by:
> > > org.apache.kafka.streams.errors.StreamsException: Exception caught in
> > > process. taskId=2_2, processor=unique_input_message-repartition-source,
> > > topic=service-unique_input_message-repartition, partition=2,
> > > offset=318846738, stacktrace=java.lang.NullPointerException at
> > >
> >
> com.app.consumer.ServiceUtils$.$anonfun$buildCountersStream$1(ServiceUtils.scala:91)
> > > at
> > >
> >
> org.apache.kafka.streams.scala.FunctionsCompatConversions$FlatValueMapperFromFunction$.$anonfun$asValueMapper$2(FunctionsCompatConversions.scala:62)
> > > at
> > >
> >
> org.apache.kafka.streams.kstream.internals.AbstractStream.lambda$withKey$1(AbstractStream.java:105)
> > > at
> > >
> >
> org.apache.kafka.streams.kstream.internals.KStreamFlatMapValues$KStreamFlatMapValuesProcessor.process(KStreamFlatMapValues.java:40)
> > > at
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
> > > at
> > >
> >
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> > > at
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
> > > at
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
> > > at
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
> > > at
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
> > > at
> > >
> >
> org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:44)
> > > at
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
> > > at
> > >
> >
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> > > at
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
> > > at
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
> > > at
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
> > > at
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
> > > at
> > >
> >
> org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
> > > at
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
> > > at
> > >
> >
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> > > at
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
> > > at
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
> > > at
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
> > > at
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
> > > at
> > >
> >
> org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:109)
> > > at
> > >
> >
> org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:78)
> > > at
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
> > > at
> > >
> >
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> > > at
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
> > > at
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
> > > at
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
> > > at
> > >
> >
> org.apache.kafka.streams.kstream.internals.SessionCacheFlushListener.apply(SessionCacheFlushListener.java:43)
> > > at
> > >
> >
> org.apache.kafka.streams.kstream.internals.SessionCacheFlushListener.apply(SessionCacheFlushListener.java:26)
> > > at
> > >
> >
> org.apache.kafka.streams.state.internals.MeteredSessionStore.lambda$setFlushListener$1(MeteredSessionStore.java:97)
> > > at
> > >
> >
> org.apache.kafka.streams.state.internals.CachingSessionStore.putAndMaybeForward(CachingSessionStore.java:98)
> > > at
> > >
> >
> org.apache.kafka.streams.state.internals.CachingSessionStore.lambda$initInternal$0(CachingSessionStore.java:76)
> > > at
> > >
> >
> org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:151)
> > > at
> > >
> >
> org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:244)
> > > at
> > >
> >
> org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:240)
> > > at
> > >
> >
> org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:150)
> > > at
> > >
> >
> org.apache.kafka.streams.state.internals.CachingSessionStore.put(CachingSessionStore.java:134)
> > > at
> > >
> >
> org.apache.kafka.streams.state.internals.CachingSessionStore.remove(CachingSessionStore.java:142)
> > > at
> > >
> >
> org.apache.kafka.streams.state.internals.MeteredSessionStore.lambda$remove$3(MeteredSessionStore.java:134)
> > > at
> > >
> >
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> > > at
> > >
> >
> org.apache.kafka.streams.state.internals.MeteredSessionStore.remove(MeteredSessionStore.java:131)
> > > at
> > >
> >
> org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator$SessionStoreReadWriteDecorator.remove(AbstractReadWriteDecorator.java:221)
> > > at
> > >
> >
> org.apache.kafka.streams.kstream.internals.KStreamSessionWindowAggregate$KStreamSessionWindowAggregateProcessor.process(KStreamSessionWindowAggregate.java:171)
> > > at
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
> > > at
> > >
> >
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> > > at
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
> > > at
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
> > > at
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
> > > at
> > >
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
> > > at
> > >
> >
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:85)
> > > at
> > >
> >
> org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:678)
> > > at
> > >
> >
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> > > at
> > >
> >
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:678)
> > > at
> > >
> >
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1034)
> > > at
> > >
> >
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:690)
> > > at
> > >
> >
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551)
> > > at
> > >
> >
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510)
> > > at
> > >
> >
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:695)
> > > at
> > >
> >
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1034)
> > > at
> > >
> >
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:690)
> > > at
> > >
> >
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551)
> > > at
> > >
> >
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510)
> > >
> >
> > * 2nd line of the exception is because we are using Scala
> > (FunctionsCompatConversions.scala:62)
> >
> > > implicit class FlatValueMapperFromFunction[V, VR](val f: V =>
> > > Iterable[VR]) extends AnyVal { def asValueMapper: ValueMapper[V,
> > > JIterable[VR]] = (value: V) => f(value).*asJava* }
> > >
> >
> > The main thing here is that we didn't change anything in the app code..
> so
> > i wonder if it's a new bug OR our implementation bug that somehow didn't
> > happen in 2.5.1 (or previous versions, since this logic is old)
> >
> > Thanks and let me know what else can help (i wish i knew how to
> reproduce,
> > it happened 6 times during the last day and no idea why.. i'll try to
> catch
> > it with logs)
> >
> > --
> >
> > Nitay Kufert
> > Backend Team Leader
> > [image: ironSource] <http://www.ironsrc.com>
> >
> > email nitay.k@ironsrc.com
> > mobile +972-54-5480021
> > fax +972-77-5448273
> > skype nitay.kufert.ssa
> > 121 Menachem Begin St., Tel Aviv, Israel
> > ironsrc.com <http://www.ironsrc.com>
> > [image: linkedin] <https://www.linkedin.com/company/ironsource> [image:
> > twitter] <https://twitter.com/ironsource> [image: facebook]
> > <https://www.facebook.com/ironSource> [image: googleplus]
> > <https://plus.google.com/+ironsrc>
> > This email (including any attachments) is for the sole use of the
> intended
> > recipient and may contain confidential information which may be protected
> > by legal privilege. If you are not the intended recipient, or the
> employee
> > or agent responsible for delivering it to the intended recipient, you are
> > hereby notified that any use, dissemination, distribution or copying of
> > this communication and/or its content is strictly prohibited. If you are
> > not the intended recipient, please immediately notify us by reply email
> or
> > by telephone, delete this email and destroy any copies. Thank you.
> >
>
>
> --
> -- Guozhang
>


-- 

Nitay Kufert
Backend Team Leader
[image: ironSource] <http://www.ironsrc.com>

email nitay.k@ironsrc.com
mobile +972-54-5480021
fax +972-77-5448273
skype nitay.kufert.ssa
121 Menachem Begin St., Tel Aviv, Israel
ironsrc.com <http://www.ironsrc.com>
[image: linkedin] <https://www.linkedin.com/company/ironsource> [image:
twitter] <https://twitter.com/ironsource> [image: facebook]
<https://www.facebook.com/ironSource> [image: googleplus]
<https://plus.google.com/+ironsrc>
This email (including any attachments) is for the sole use of the intended
recipient and may contain confidential information which may be protected
by legal privilege. If you are not the intended recipient, or the employee
or agent responsible for delivering it to the intended recipient, you are
hereby notified that any use, dissemination, distribution or copying of
this communication and/or its content is strictly prohibited. If you are
not the intended recipient, please immediately notify us by reply email or
by telephone, delete this email and destroy any copies. Thank you.

Re: NullPointerException after upgrading from 2.5.1 to 2.6.1 in my stream app

Posted by Guozhang Wang <wa...@gmail.com>.
Could you share your code around

>
com.app.consumer.Utils$.$anonfun$buildCountersStream$1(ServiceUtils.scala:91)

That seems to be where NPE is thrown.


On Wed, Jan 13, 2021 at 5:46 AM Nitay Kufert <ni...@ironsrc.com> wrote:

> Hey,
> *Without any code change*, just by bumping the kafka version from 2.5.1 to
> 2.6.1 (clients only) - my stream application started throwing
> NullPointerException (sometimes, not in a predicted pattern).
> Maybe it's worth mentioning that I also removed the "UPGRADE_FROM" conf
> that was forgotten there from the older versions.
>
> We are using Scala 2.12, and the line that throws this exception is using
> flatMapValues:
>
>
> >  inputStream.flatMapValues(_.split) # return type
> > KStream[Windowed[String], SingleInputMessage]
>
>
> Where inputStream is of type: KStream[Windowed[String], InputMessage] and
> the split method splits this InputMessage into several
> SingleInputMessage messages (hence the flat - to avoid
> List[SingleInputMessage]).
>
> The exception:
>
> > java.lang.NullPointerException: null Wrapped by:
> > org.apache.kafka.streams.errors.StreamsException: Exception caught in
> > process. taskId=2_2, processor=unique_input_message-repartition-source,
> > topic=service-unique_input_message-repartition, partition=2,
> > offset=318846738, stacktrace=java.lang.NullPointerException
> >
>
> java.lang.NullPointerException: null at
> >
> com.app.consumer.Utils$.$anonfun$buildCountersStream$1(ServiceUtils.scala:91)
> > at
> >
> org.apache.kafka.streams.scala.FunctionsCompatConversions$FlatValueMapperFromFunction$.$anonfun$asValueMapper$2(FunctionsCompatConversions.scala:62)
> > at
> >
> org.apache.kafka.streams.kstream.internals.AbstractStream.lambda$withKey$1(AbstractStream.java:105)
> > at
> >
> org.apache.kafka.streams.kstream.internals.KStreamFlatMapValues$KStreamFlatMapValuesProcessor.process(KStreamFlatMapValues.java:40)
> > at
> >
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
> > at
> >
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> > at
> >
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
> > at
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
> > at
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
> > at
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
> > at
> >
> org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:44)
> > at
> >
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
> > at
> >
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> > at
> >
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
> > at
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
> > at
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
> > at
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
> > at
> >
> org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
> > at
> >
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
> > at
> >
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> > at
> >
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
> > at
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
> > at
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
> > at
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
> > at
> >
> org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:109)
> > at
> >
> org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:78)
> > at
> >
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
> > at
> >
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> > at
> >
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
> > at
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
> > at
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
> > at
> >
> org.apache.kafka.streams.kstream.internals.SessionCacheFlushListener.apply(SessionCacheFlushListener.java:43)
> > at
> >
> org.apache.kafka.streams.kstream.internals.SessionCacheFlushListener.apply(SessionCacheFlushListener.java:26)
> > at
> >
> org.apache.kafka.streams.state.internals.MeteredSessionStore.lambda$setFlushListener$1(MeteredSessionStore.java:97)
> > at
> >
> org.apache.kafka.streams.state.internals.CachingSessionStore.putAndMaybeForward(CachingSessionStore.java:98)
> > at
> >
> org.apache.kafka.streams.state.internals.CachingSessionStore.lambda$initInternal$0(CachingSessionStore.java:76)
> > at
> >
> org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:151)
> > at
> >
> org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:244)
> > at
> >
> org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:240)
> > at
> >
> org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:150)
> > at
> >
> org.apache.kafka.streams.state.internals.CachingSessionStore.put(CachingSessionStore.java:134)
> > at
> >
> org.apache.kafka.streams.state.internals.CachingSessionStore.remove(CachingSessionStore.java:142)
> > at
> >
> org.apache.kafka.streams.state.internals.MeteredSessionStore.lambda$remove$3(MeteredSessionStore.java:134)
> > at
> >
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> > at
> >
> org.apache.kafka.streams.state.internals.MeteredSessionStore.remove(MeteredSessionStore.java:131)
> > at
> >
> org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator$SessionStoreReadWriteDecorator.remove(AbstractReadWriteDecorator.java:221)
> > at
> >
> org.apache.kafka.streams.kstream.internals.KStreamSessionWindowAggregate$KStreamSessionWindowAggregateProcessor.process(KStreamSessionWindowAggregate.java:171)
> > at
> >
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
> > at
> >
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> > at
> >
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
> > at
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
> > at
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
> > at
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
> > at
> >
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:85)
> > at
> >
> org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:678)
> > at
> >
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> > at
> >
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:678)
> > ... 4 common frames omitted Wrapped by:
> > org.apache.kafka.streams.errors.StreamsException: Exception caught in
> > process. taskId=2_2, processor=unique_input_message-repartition-source,
> > topic=service-unique_input_message-repartition, partition=2,
> > offset=318846738, stacktrace=java.lang.NullPointerException at
> >
> com.app.consumer.ServiceUtils$.$anonfun$buildCountersStream$1(ServiceUtils.scala:91)
> > at
> >
> org.apache.kafka.streams.scala.FunctionsCompatConversions$FlatValueMapperFromFunction$.$anonfun$asValueMapper$2(FunctionsCompatConversions.scala:62)
> > at
> >
> org.apache.kafka.streams.kstream.internals.AbstractStream.lambda$withKey$1(AbstractStream.java:105)
> > at
> >
> org.apache.kafka.streams.kstream.internals.KStreamFlatMapValues$KStreamFlatMapValuesProcessor.process(KStreamFlatMapValues.java:40)
> > at
> >
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
> > at
> >
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> > at
> >
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
> > at
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
> > at
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
> > at
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
> > at
> >
> org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:44)
> > at
> >
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
> > at
> >
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> > at
> >
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
> > at
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
> > at
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
> > at
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
> > at
> >
> org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
> > at
> >
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
> > at
> >
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> > at
> >
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
> > at
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
> > at
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
> > at
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
> > at
> >
> org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:109)
> > at
> >
> org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:78)
> > at
> >
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
> > at
> >
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> > at
> >
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
> > at
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
> > at
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
> > at
> >
> org.apache.kafka.streams.kstream.internals.SessionCacheFlushListener.apply(SessionCacheFlushListener.java:43)
> > at
> >
> org.apache.kafka.streams.kstream.internals.SessionCacheFlushListener.apply(SessionCacheFlushListener.java:26)
> > at
> >
> org.apache.kafka.streams.state.internals.MeteredSessionStore.lambda$setFlushListener$1(MeteredSessionStore.java:97)
> > at
> >
> org.apache.kafka.streams.state.internals.CachingSessionStore.putAndMaybeForward(CachingSessionStore.java:98)
> > at
> >
> org.apache.kafka.streams.state.internals.CachingSessionStore.lambda$initInternal$0(CachingSessionStore.java:76)
> > at
> >
> org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:151)
> > at
> >
> org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:244)
> > at
> >
> org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:240)
> > at
> >
> org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:150)
> > at
> >
> org.apache.kafka.streams.state.internals.CachingSessionStore.put(CachingSessionStore.java:134)
> > at
> >
> org.apache.kafka.streams.state.internals.CachingSessionStore.remove(CachingSessionStore.java:142)
> > at
> >
> org.apache.kafka.streams.state.internals.MeteredSessionStore.lambda$remove$3(MeteredSessionStore.java:134)
> > at
> >
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> > at
> >
> org.apache.kafka.streams.state.internals.MeteredSessionStore.remove(MeteredSessionStore.java:131)
> > at
> >
> org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator$SessionStoreReadWriteDecorator.remove(AbstractReadWriteDecorator.java:221)
> > at
> >
> org.apache.kafka.streams.kstream.internals.KStreamSessionWindowAggregate$KStreamSessionWindowAggregateProcessor.process(KStreamSessionWindowAggregate.java:171)
> > at
> >
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
> > at
> >
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> > at
> >
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
> > at
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
> > at
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
> > at
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
> > at
> >
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:85)
> > at
> >
> org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:678)
> > at
> >
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> > at
> >
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:678)
> > at
> >
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1034)
> > at
> >
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:690)
> > at
> >
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551)
> > at
> >
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510)
> > at
> >
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:695)
> > at
> >
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1034)
> > at
> >
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:690)
> > at
> >
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551)
> > at
> >
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510)
> >
>
> * 2nd line of the exception is because we are using Scala
> (FunctionsCompatConversions.scala:62)
>
> > implicit class FlatValueMapperFromFunction[V, VR](val f: V =>
> > Iterable[VR]) extends AnyVal { def asValueMapper: ValueMapper[V,
> > JIterable[VR]] = (value: V) => f(value).*asJava* }
> >
>
> The main thing here is that we didn't change anything in the app code.. so
> i wonder if it's a new bug OR our implementation bug that somehow didn't
> happen in 2.5.1 (or previous versions, since this logic is old)
>
> Thanks and let me know what else can help (i wish i knew how to reproduce,
> it happened 6 times during the last day and no idea why.. i'll try to catch
> it with logs)
>
> --
>
> Nitay Kufert
> Backend Team Leader
> [image: ironSource] <http://www.ironsrc.com>
>
> email nitay.k@ironsrc.com
> mobile +972-54-5480021
> fax +972-77-5448273
> skype nitay.kufert.ssa
> 121 Menachem Begin St., Tel Aviv, Israel
> ironsrc.com <http://www.ironsrc.com>
> [image: linkedin] <https://www.linkedin.com/company/ironsource> [image:
> twitter] <https://twitter.com/ironsource> [image: facebook]
> <https://www.facebook.com/ironSource> [image: googleplus]
> <https://plus.google.com/+ironsrc>
> This email (including any attachments) is for the sole use of the intended
> recipient and may contain confidential information which may be protected
> by legal privilege. If you are not the intended recipient, or the employee
> or agent responsible for delivering it to the intended recipient, you are
> hereby notified that any use, dissemination, distribution or copying of
> this communication and/or its content is strictly prohibited. If you are
> not the intended recipient, please immediately notify us by reply email or
> by telephone, delete this email and destroy any copies. Thank you.
>


-- 
-- Guozhang