You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Nitay Kufert <ni...@is.com.INVALID> on 2021/06/22 16:26:12 UTC

Re: NullPointerException after upgrading from 2.5.1 to 2.6.1 in my stream app

Bumping for the off chance that during this time some sort of a bug was
reported that might explain this behaviour..  i will feel more comfortable
bumping our kafka versions this way :)

On Wed, Feb 24, 2021 at 12:48 PM Nitay Kufert <ni...@ironsrc.com> wrote:

> I guess it's possible but very unlikely because it works perfectly with
> all the previous versions and the current one? (2.5.1)
> Why did a change in the version introduce NULLS there?
>
> On Tue, Feb 23, 2021 at 9:16 PM Guozhang Wang <wa...@gmail.com> wrote:
>
>> Is it possible that the flattened values contain `null` and hence
>> `_.split`
>> throws?
>>
>> On Tue, Feb 23, 2021 at 8:23 AM Nitay Kufert <ni...@ironsrc.com> wrote:
>>
>> > Hey, missed your replay - but the code i've shared above the logs is the
>> > code around those lines (removed some identifiers to make it a little
>> bit
>> > more generic):
>> >
>> > > inputStream.flatMapValues(_.split).peek((k, v) => {val _ = $k ->
>> > > ${v.printForDebug}")}) # return type KStream[Windowed[String],
>> > > SingleInputMessage]
>> >
>> >
>> > On Fri, Jan 29, 2021 at 9:01 AM Guozhang Wang <wa...@gmail.com>
>> wrote:
>> >
>> > > Could you share your code around
>> > >
>> > > >
>> > >
>> > >
>> >
>> com.app.consumer.Utils$.$anonfun$buildCountersStream$1(ServiceUtils.scala:91)
>> > >
>> > > That seems to be where NPE is thrown.
>> > >
>> > >
>> > > On Wed, Jan 13, 2021 at 5:46 AM Nitay Kufert <ni...@ironsrc.com>
>> > wrote:
>> > >
>> > > > Hey,
>> > > > *Without any code change*, just by bumping the kafka version from
>> 2.5.1
>> > > to
>> > > > 2.6.1 (clients only) - my stream application started throwing
>> > > > NullPointerException (sometimes, not in a predicted pattern).
>> > > > Maybe it's worth mentioning that I also removed the "UPGRADE_FROM"
>> conf
>> > > > that was forgotten there from the older versions.
>> > > >
>> > > > We are using Scala 2.12, and the line that throws this exception is
>> > using
>> > > > flatMapValues:
>> > > >
>> > > >
>> > > > >  inputStream.flatMapValues(_.split) # return type
>> > > > > KStream[Windowed[String], SingleInputMessage]
>> > > >
>> > > >
>> > > > Where inputStream is of type: KStream[Windowed[String],
>> InputMessage]
>> > and
>> > > > the split method splits this InputMessage into several
>> > > > SingleInputMessage messages (hence the flat - to avoid
>> > > > List[SingleInputMessage]).
>> > > >
>> > > > The exception:
>> > > >
>> > > > > java.lang.NullPointerException: null Wrapped by:
>> > > > > org.apache.kafka.streams.errors.StreamsException: Exception
>> caught in
>> > > > > process. taskId=2_2,
>> > processor=unique_input_message-repartition-source,
>> > > > > topic=service-unique_input_message-repartition, partition=2,
>> > > > > offset=318846738, stacktrace=java.lang.NullPointerException
>> > > > >
>> > > >
>> > > > java.lang.NullPointerException: null at
>> > > > >
>> > > >
>> > >
>> >
>> com.app.consumer.Utils$.$anonfun$buildCountersStream$1(ServiceUtils.scala:91)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.scala.FunctionsCompatConversions$FlatValueMapperFromFunction$.$anonfun$asValueMapper$2(FunctionsCompatConversions.scala:62)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.kstream.internals.AbstractStream.lambda$withKey$1(AbstractStream.java:105)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.kstream.internals.KStreamFlatMapValues$KStreamFlatMapValuesProcessor.process(KStreamFlatMapValues.java:40)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:44)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:109)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:78)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.kstream.internals.SessionCacheFlushListener.apply(SessionCacheFlushListener.java:43)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.kstream.internals.SessionCacheFlushListener.apply(SessionCacheFlushListener.java:26)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.state.internals.MeteredSessionStore.lambda$setFlushListener$1(MeteredSessionStore.java:97)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.state.internals.CachingSessionStore.putAndMaybeForward(CachingSessionStore.java:98)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.state.internals.CachingSessionStore.lambda$initInternal$0(CachingSessionStore.java:76)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:151)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:244)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:240)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:150)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.state.internals.CachingSessionStore.put(CachingSessionStore.java:134)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.state.internals.CachingSessionStore.remove(CachingSessionStore.java:142)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.state.internals.MeteredSessionStore.lambda$remove$3(MeteredSessionStore.java:134)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.state.internals.MeteredSessionStore.remove(MeteredSessionStore.java:131)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator$SessionStoreReadWriteDecorator.remove(AbstractReadWriteDecorator.java:221)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.kstream.internals.KStreamSessionWindowAggregate$KStreamSessionWindowAggregateProcessor.process(KStreamSessionWindowAggregate.java:171)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:85)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:678)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:678)
>> > > > > ... 4 common frames omitted Wrapped by:
>> > > > > org.apache.kafka.streams.errors.StreamsException: Exception
>> caught in
>> > > > > process. taskId=2_2,
>> > processor=unique_input_message-repartition-source,
>> > > > > topic=service-unique_input_message-repartition, partition=2,
>> > > > > offset=318846738, stacktrace=java.lang.NullPointerException at
>> > > > >
>> > > >
>> > >
>> >
>> com.app.consumer.ServiceUtils$.$anonfun$buildCountersStream$1(ServiceUtils.scala:91)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.scala.FunctionsCompatConversions$FlatValueMapperFromFunction$.$anonfun$asValueMapper$2(FunctionsCompatConversions.scala:62)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.kstream.internals.AbstractStream.lambda$withKey$1(AbstractStream.java:105)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.kstream.internals.KStreamFlatMapValues$KStreamFlatMapValuesProcessor.process(KStreamFlatMapValues.java:40)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:44)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:109)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:78)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.kstream.internals.SessionCacheFlushListener.apply(SessionCacheFlushListener.java:43)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.kstream.internals.SessionCacheFlushListener.apply(SessionCacheFlushListener.java:26)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.state.internals.MeteredSessionStore.lambda$setFlushListener$1(MeteredSessionStore.java:97)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.state.internals.CachingSessionStore.putAndMaybeForward(CachingSessionStore.java:98)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.state.internals.CachingSessionStore.lambda$initInternal$0(CachingSessionStore.java:76)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:151)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:244)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:240)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:150)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.state.internals.CachingSessionStore.put(CachingSessionStore.java:134)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.state.internals.CachingSessionStore.remove(CachingSessionStore.java:142)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.state.internals.MeteredSessionStore.lambda$remove$3(MeteredSessionStore.java:134)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.state.internals.MeteredSessionStore.remove(MeteredSessionStore.java:131)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator$SessionStoreReadWriteDecorator.remove(AbstractReadWriteDecorator.java:221)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.kstream.internals.KStreamSessionWindowAggregate$KStreamSessionWindowAggregateProcessor.process(KStreamSessionWindowAggregate.java:171)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:85)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:678)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:678)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1034)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:690)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:695)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1034)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:690)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551)
>> > > > > at
>> > > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510)
>> > > > >
>> > > >
>> > > > * 2nd line of the exception is because we are using Scala
>> > > > (FunctionsCompatConversions.scala:62)
>> > > >
>> > > > > implicit class FlatValueMapperFromFunction[V, VR](val f: V =>
>> > > > > Iterable[VR]) extends AnyVal { def asValueMapper: ValueMapper[V,
>> > > > > JIterable[VR]] = (value: V) => f(value).*asJava* }
>> > > > >
>> > > >
>> > > > The main thing here is that we didn't change anything in the app
>> code..
>> > > so
>> > > > i wonder if it's a new bug OR our implementation bug that somehow
>> > didn't
>> > > > happen in 2.5.1 (or previous versions, since this logic is old)
>> > > >
>> > > > Thanks and let me know what else can help (i wish i knew how to
>> > > reproduce,
>> > > > it happened 6 times during the last day and no idea why.. i'll try
>> to
>> > > catch
>> > > > it with logs)
>> > > >
>> > > > --
>> > > >
>> > > > Nitay Kufert
>> > > > Backend Team Leader
>> > > > [image: ironSource] <http://www.ironsrc.com>
>> > > >
>> > > > email nitay.k@ironsrc.com
>> > > > mobile +972-54-5480021
>> > > > fax +972-77-5448273
>> > > > skype nitay.kufert.ssa
>> > > > 121 Menachem Begin St., Tel Aviv, Israel
>> > > > ironsrc.com <http://www.ironsrc.com>
>> > > > [image: linkedin] <https://www.linkedin.com/company/ironsource>
>> > [image:
>> > > > twitter] <https://twitter.com/ironsource> [image: facebook]
>> > > > <https://www.facebook.com/ironSource> [image: googleplus]
>> > > > <https://plus.google.com/+ironsrc>
>> > > > This email (including any attachments) is for the sole use of the
>> > > intended
>> > > > recipient and may contain confidential information which may be
>> > protected
>> > > > by legal privilege. If you are not the intended recipient, or the
>> > > employee
>> > > > or agent responsible for delivering it to the intended recipient,
>> you
>> > are
>> > > > hereby notified that any use, dissemination, distribution or
>> copying of
>> > > > this communication and/or its content is strictly prohibited. If you
>> > are
>> > > > not the intended recipient, please immediately notify us by reply
>> email
>> > > or
>> > > > by telephone, delete this email and destroy any copies. Thank you.
>> > > >
>> > >
>> > >
>> > > --
>> > > -- Guozhang
>> > >
>> >
>> >
>> > --
>> >
>> > Nitay Kufert
>> > Backend Team Leader
>> > [image: ironSource] <http://www.ironsrc.com>
>> >
>> > email nitay.k@ironsrc.com
>> > mobile +972-54-5480021
>> > fax +972-77-5448273
>> > skype nitay.kufert.ssa
>> > 121 Menachem Begin St., Tel Aviv, Israel
>> > ironsrc.com <http://www.ironsrc.com>
>> > [image: linkedin] <https://www.linkedin.com/company/ironsource> [image:
>> > twitter] <https://twitter.com/ironsource> [image: facebook]
>> > <https://www.facebook.com/ironSource> [image: googleplus]
>> > <https://plus.google.com/+ironsrc>
>> > This email (including any attachments) is for the sole use of the
>> intended
>> > recipient and may contain confidential information which may be
>> protected
>> > by legal privilege. If you are not the intended recipient, or the
>> employee
>> > or agent responsible for delivering it to the intended recipient, you
>> are
>> > hereby notified that any use, dissemination, distribution or copying of
>> > this communication and/or its content is strictly prohibited. If you are
>> > not the intended recipient, please immediately notify us by reply email
>> or
>> > by telephone, delete this email and destroy any copies. Thank you.
>> >
>>
>>
>> --
>> -- Guozhang
>>
>
>
> --
>
> Nitay Kufert
> Backend Team Leader
> [image: ironSource] <http://www.ironsrc.com>
>
> email nitay.k@ironsrc.com
> mobile +972-54-5480021
> fax +972-77-5448273
> skype nitay.kufert.ssa
> 121 Menachem Begin St., Tel Aviv, Israel
> ironsrc.com <http://www.ironsrc.com>
> [image: linkedin] <https://www.linkedin.com/company/ironsource> [image:
> twitter] <https://twitter.com/ironsource> [image: facebook]
> <https://www.facebook.com/ironSource> [image: googleplus]
> <https://plus.google.com/+ironsrc>
> This email (including any attachments) is for the sole use of the intended
> recipient and may contain confidential information which may be protected
> by legal privilege. If you are not the intended recipient, or the employee
> or agent responsible for delivering it to the intended recipient, you are
> hereby notified that any use, dissemination, distribution or copying of
> this communication and/or its content is strictly prohibited. If you are
> not the intended recipient, please immediately notify us by reply email or
> by telephone, delete this email and destroy any copies. Thank you.
>

Re: NullPointerException after upgrading from 2.5.1 to 2.6.1 in my stream app

Posted by Guozhang Wang <wa...@gmail.com>.
Hello Nitay,

I have not heard someone else reporting similar things that may point to a
bug still.. Maybe you could try to reproduce the issue by first starting a
brand new app in 2.5, and then follow the upgrade path (with config
overrides) to 2.6 and see if it is easily reproducible, and if yes create a
JIRA ticket?

Guozhang

On Wed, Jun 23, 2021 at 3:50 PM Nitay Kufert <ni...@is.com.invalid> wrote:

> Bumping for the off chance that during this time some sort of a bug was
> reported that might explain this behaviour..  i will feel more comfortable
> bumping our kafka versions this way :)
>
> On Wed, Feb 24, 2021 at 12:48 PM Nitay Kufert <ni...@ironsrc.com> wrote:
>
> > I guess it's possible but very unlikely because it works perfectly with
> > all the previous versions and the current one? (2.5.1)
> > Why did a change in the version introduce NULLS there?
> >
> > On Tue, Feb 23, 2021 at 9:16 PM Guozhang Wang <wa...@gmail.com>
> wrote:
> >
> >> Is it possible that the flattened values contain `null` and hence
> >> `_.split`
> >> throws?
> >>
> >> On Tue, Feb 23, 2021 at 8:23 AM Nitay Kufert <ni...@ironsrc.com>
> wrote:
> >>
> >> > Hey, missed your replay - but the code i've shared above the logs is
> the
> >> > code around those lines (removed some identifiers to make it a little
> >> bit
> >> > more generic):
> >> >
> >> > > inputStream.flatMapValues(_.split).peek((k, v) => {val _ = $k ->
> >> > > ${v.printForDebug}")}) # return type KStream[Windowed[String],
> >> > > SingleInputMessage]
> >> >
> >> >
> >> > On Fri, Jan 29, 2021 at 9:01 AM Guozhang Wang <wa...@gmail.com>
> >> wrote:
> >> >
> >> > > Could you share your code around
> >> > >
> >> > > >
> >> > >
> >> > >
> >> >
> >>
> com.app.consumer.Utils$.$anonfun$buildCountersStream$1(ServiceUtils.scala:91)
> >> > >
> >> > > That seems to be where NPE is thrown.
> >> > >
> >> > >
> >> > > On Wed, Jan 13, 2021 at 5:46 AM Nitay Kufert <ni...@ironsrc.com>
> >> > wrote:
> >> > >
> >> > > > Hey,
> >> > > > *Without any code change*, just by bumping the kafka version from
> >> 2.5.1
> >> > > to
> >> > > > 2.6.1 (clients only) - my stream application started throwing
> >> > > > NullPointerException (sometimes, not in a predicted pattern).
> >> > > > Maybe it's worth mentioning that I also removed the "UPGRADE_FROM"
> >> conf
> >> > > > that was forgotten there from the older versions.
> >> > > >
> >> > > > We are using Scala 2.12, and the line that throws this exception
> is
> >> > using
> >> > > > flatMapValues:
> >> > > >
> >> > > >
> >> > > > >  inputStream.flatMapValues(_.split) # return type
> >> > > > > KStream[Windowed[String], SingleInputMessage]
> >> > > >
> >> > > >
> >> > > > Where inputStream is of type: KStream[Windowed[String],
> >> InputMessage]
> >> > and
> >> > > > the split method splits this InputMessage into several
> >> > > > SingleInputMessage messages (hence the flat - to avoid
> >> > > > List[SingleInputMessage]).
> >> > > >
> >> > > > The exception:
> >> > > >
> >> > > > > java.lang.NullPointerException: null Wrapped by:
> >> > > > > org.apache.kafka.streams.errors.StreamsException: Exception
> >> caught in
> >> > > > > process. taskId=2_2,
> >> > processor=unique_input_message-repartition-source,
> >> > > > > topic=service-unique_input_message-repartition, partition=2,
> >> > > > > offset=318846738, stacktrace=java.lang.NullPointerException
> >> > > > >
> >> > > >
> >> > > > java.lang.NullPointerException: null at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> com.app.consumer.Utils$.$anonfun$buildCountersStream$1(ServiceUtils.scala:91)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.scala.FunctionsCompatConversions$FlatValueMapperFromFunction$.$anonfun$asValueMapper$2(FunctionsCompatConversions.scala:62)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.kstream.internals.AbstractStream.lambda$withKey$1(AbstractStream.java:105)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.kstream.internals.KStreamFlatMapValues$KStreamFlatMapValuesProcessor.process(KStreamFlatMapValues.java:40)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:44)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:109)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:78)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.kstream.internals.SessionCacheFlushListener.apply(SessionCacheFlushListener.java:43)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.kstream.internals.SessionCacheFlushListener.apply(SessionCacheFlushListener.java:26)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.state.internals.MeteredSessionStore.lambda$setFlushListener$1(MeteredSessionStore.java:97)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.state.internals.CachingSessionStore.putAndMaybeForward(CachingSessionStore.java:98)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.state.internals.CachingSessionStore.lambda$initInternal$0(CachingSessionStore.java:76)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:151)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:244)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:240)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:150)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.state.internals.CachingSessionStore.put(CachingSessionStore.java:134)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.state.internals.CachingSessionStore.remove(CachingSessionStore.java:142)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.state.internals.MeteredSessionStore.lambda$remove$3(MeteredSessionStore.java:134)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.state.internals.MeteredSessionStore.remove(MeteredSessionStore.java:131)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator$SessionStoreReadWriteDecorator.remove(AbstractReadWriteDecorator.java:221)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.kstream.internals.KStreamSessionWindowAggregate$KStreamSessionWindowAggregateProcessor.process(KStreamSessionWindowAggregate.java:171)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:85)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:678)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:678)
> >> > > > > ... 4 common frames omitted Wrapped by:
> >> > > > > org.apache.kafka.streams.errors.StreamsException: Exception
> >> caught in
> >> > > > > process. taskId=2_2,
> >> > processor=unique_input_message-repartition-source,
> >> > > > > topic=service-unique_input_message-repartition, partition=2,
> >> > > > > offset=318846738, stacktrace=java.lang.NullPointerException at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> com.app.consumer.ServiceUtils$.$anonfun$buildCountersStream$1(ServiceUtils.scala:91)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.scala.FunctionsCompatConversions$FlatValueMapperFromFunction$.$anonfun$asValueMapper$2(FunctionsCompatConversions.scala:62)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.kstream.internals.AbstractStream.lambda$withKey$1(AbstractStream.java:105)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.kstream.internals.KStreamFlatMapValues$KStreamFlatMapValuesProcessor.process(KStreamFlatMapValues.java:40)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:44)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:109)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterProcessor.process(KTableFilter.java:78)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.kstream.internals.SessionCacheFlushListener.apply(SessionCacheFlushListener.java:43)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.kstream.internals.SessionCacheFlushListener.apply(SessionCacheFlushListener.java:26)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.state.internals.MeteredSessionStore.lambda$setFlushListener$1(MeteredSessionStore.java:97)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.state.internals.CachingSessionStore.putAndMaybeForward(CachingSessionStore.java:98)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.state.internals.CachingSessionStore.lambda$initInternal$0(CachingSessionStore.java:76)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:151)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:244)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:240)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:150)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.state.internals.CachingSessionStore.put(CachingSessionStore.java:134)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.state.internals.CachingSessionStore.remove(CachingSessionStore.java:142)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.state.internals.MeteredSessionStore.lambda$remove$3(MeteredSessionStore.java:134)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.state.internals.MeteredSessionStore.remove(MeteredSessionStore.java:131)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator$SessionStoreReadWriteDecorator.remove(AbstractReadWriteDecorator.java:221)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.kstream.internals.KStreamSessionWindowAggregate$KStreamSessionWindowAggregateProcessor.process(KStreamSessionWindowAggregate.java:171)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:85)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:678)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:678)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1034)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:690)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:695)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1034)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:690)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551)
> >> > > > > at
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510)
> >> > > > >
> >> > > >
> >> > > > * 2nd line of the exception is because we are using Scala
> >> > > > (FunctionsCompatConversions.scala:62)
> >> > > >
> >> > > > > implicit class FlatValueMapperFromFunction[V, VR](val f: V =>
> >> > > > > Iterable[VR]) extends AnyVal { def asValueMapper: ValueMapper[V,
> >> > > > > JIterable[VR]] = (value: V) => f(value).*asJava* }
> >> > > > >
> >> > > >
> >> > > > The main thing here is that we didn't change anything in the app
> >> code..
> >> > > so
> >> > > > i wonder if it's a new bug OR our implementation bug that somehow
> >> > didn't
> >> > > > happen in 2.5.1 (or previous versions, since this logic is old)
> >> > > >
> >> > > > Thanks and let me know what else can help (i wish i knew how to
> >> > > reproduce,
> >> > > > it happened 6 times during the last day and no idea why.. i'll try
> >> to
> >> > > catch
> >> > > > it with logs)
> >> > > >
> >> > > > --
> >> > > >
> >> > > > Nitay Kufert
> >> > > > Backend Team Leader
> >> > > > [image: ironSource] <http://www.ironsrc.com>
> >> > > >
> >> > > > email nitay.k@ironsrc.com
> >> > > > mobile +972-54-5480021
> >> > > > fax +972-77-5448273
> >> > > > skype nitay.kufert.ssa
> >> > > > 121 Menachem Begin St., Tel Aviv, Israel
> >> > > > ironsrc.com <http://www.ironsrc.com>
> >> > > > [image: linkedin] <https://www.linkedin.com/company/ironsource>
> >> > [image:
> >> > > > twitter] <https://twitter.com/ironsource> [image: facebook]
> >> > > > <https://www.facebook.com/ironSource> [image: googleplus]
> >> > > > <https://plus.google.com/+ironsrc>
> >> > > > This email (including any attachments) is for the sole use of the
> >> > > intended
> >> > > > recipient and may contain confidential information which may be
> >> > protected
> >> > > > by legal privilege. If you are not the intended recipient, or the
> >> > > employee
> >> > > > or agent responsible for delivering it to the intended recipient,
> >> you
> >> > are
> >> > > > hereby notified that any use, dissemination, distribution or
> >> copying of
> >> > > > this communication and/or its content is strictly prohibited. If
> you
> >> > are
> >> > > > not the intended recipient, please immediately notify us by reply
> >> email
> >> > > or
> >> > > > by telephone, delete this email and destroy any copies. Thank you.
> >> > > >
> >> > >
> >> > >
> >> > > --
> >> > > -- Guozhang
> >> > >
> >> >
> >> >
> >> > --
> >> >
> >> > Nitay Kufert
> >> > Backend Team Leader
> >> > [image: ironSource] <http://www.ironsrc.com>
> >> >
> >> > email nitay.k@ironsrc.com
> >> > mobile +972-54-5480021
> >> > fax +972-77-5448273
> >> > skype nitay.kufert.ssa
> >> > 121 Menachem Begin St., Tel Aviv, Israel
> >> > ironsrc.com <http://www.ironsrc.com>
> >> > [image: linkedin] <https://www.linkedin.com/company/ironsource>
> [image:
> >> > twitter] <https://twitter.com/ironsource> [image: facebook]
> >> > <https://www.facebook.com/ironSource> [image: googleplus]
> >> > <https://plus.google.com/+ironsrc>
> >> > This email (including any attachments) is for the sole use of the
> >> intended
> >> > recipient and may contain confidential information which may be
> >> protected
> >> > by legal privilege. If you are not the intended recipient, or the
> >> employee
> >> > or agent responsible for delivering it to the intended recipient, you
> >> are
> >> > hereby notified that any use, dissemination, distribution or copying
> of
> >> > this communication and/or its content is strictly prohibited. If you
> are
> >> > not the intended recipient, please immediately notify us by reply
> email
> >> or
> >> > by telephone, delete this email and destroy any copies. Thank you.
> >> >
> >>
> >>
> >> --
> >> -- Guozhang
> >>
> >
> >
> > --
> >
> > Nitay Kufert
> > Backend Team Leader
> > [image: ironSource] <http://www.ironsrc.com>
> >
> > email nitay.k@ironsrc.com
> > mobile +972-54-5480021
> > fax +972-77-5448273
> > skype nitay.kufert.ssa
> > 121 Menachem Begin St., Tel Aviv, Israel
> > ironsrc.com <http://www.ironsrc.com>
> > [image: linkedin] <https://www.linkedin.com/company/ironsource> [image:
> > twitter] <https://twitter.com/ironsource> [image: facebook]
> > <https://www.facebook.com/ironSource> [image: googleplus]
> > <https://plus.google.com/+ironsrc>
> > This email (including any attachments) is for the sole use of the
> intended
> > recipient and may contain confidential information which may be protected
> > by legal privilege. If you are not the intended recipient, or the
> employee
> > or agent responsible for delivering it to the intended recipient, you are
> > hereby notified that any use, dissemination, distribution or copying of
> > this communication and/or its content is strictly prohibited. If you are
> > not the intended recipient, please immediately notify us by reply email
> or
> > by telephone, delete this email and destroy any copies. Thank you.
> >
>


-- 
-- Guozhang