You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Frank Lyaruu <fl...@gmail.com> on 2018/06/18 11:29:18 UTC

NPE in low level Kafka topology

Hi, I've upgraded our 0.11 based stream application to the trunk version,
and I get an intermittent NPE. It's is quite a big topology, and I haven't
succeeded in reproducing it on a simpler topology.
It builds the topology, starts Kafka Streams, runs for about 20s., and then
it terminates
It seems that the 'currentNode' in the ProcessorContext is null.

Does this ring a bell for anyone?

[lowlevel-develop-generation-7aa-lowlevel-e23137ae-d94b-4f17-a684-995320fd426d-StreamThread-12]
ERROR org.apache.kafka.streams.processor.internals.AssignedStreamsTasks -
stream-thread
[lowlevel-develop-generation-7aa-lowlevel-e23137ae-d94b-4f17-a684-995320fd426d-StreamThread-12]
Failed to process stream task 0_0 due to the following error:
java.lang.NullPointerException
    at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:114)
    at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
    at
com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedProcessor.forwardMessage(OneToManyGroupedProcessor.java:125)
    at
com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedProcessor.forwardJoin(OneToManyGroupedProcessor.java:101)
    at
com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedProcessor.process(OneToManyGroupedProcessor.java:70)
    at
com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedProcessor.process(OneToManyGroupedProcessor.java:1)
    at
org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:50)
    at
org.apache.kafka.streams.processor.internals.ProcessorNode.runAndMeasureLatency(ProcessorNode.java:244)
    at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
    at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
    at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126)
    at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
    at
com.dexels.kafka.streams.remotejoin.PreJoinProcessor.process(PreJoinProcessor.java:25)
    at
com.dexels.kafka.streams.remotejoin.PreJoinProcessor.process(PreJoinProcessor.java:1)
    at
org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:50)
    at
org.apache.kafka.streams.processor.internals.ProcessorNode.runAndMeasureLatency(ProcessorNode.java:244)
    at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
    at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
    at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126)
    at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
    at
com.dexels.kafka.streams.remotejoin.StoreProcessor.process(StoreProcessor.java:48)
    at
com.dexels.kafka.streams.remotejoin.StoreProcessor.process(StoreProcessor.java:1)
    at
org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:50)
    at
org.apache.kafka.streams.processor.internals.ProcessorNode.runAndMeasureLatency(ProcessorNode.java:244)
    at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
    at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
    at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126)
    at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
    at
com.dexels.kafka.streams.remotejoin.XmlTransformerProcessor.process(XmlTransformerProcessor.java:52)
    at
com.dexels.kafka.streams.remotejoin.XmlTransformerProcessor.process(XmlTransformerProcessor.java:1)
    at
org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:50)
    at
org.apache.kafka.streams.processor.internals.ProcessorNode.runAndMeasureLatency(ProcessorNode.java:244)
    at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
    at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
    at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126)
    at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
    at
org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87)
    at
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:288)
    at
org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:94)
    at
org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:409)
    at
org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:952)
    at
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:827)
    at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:767)
    at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:736)

Re: NPE in low level Kafka topology

Posted by Guozhang Wang <wa...@gmail.com>.
Hi Frank,

One issue that I can remember is this one:
https://issues.apache.org/jira/browse/KAFKA-4324

Since I have not seen your full topology building code including
com.dexels.kafka.streams.remotejoin.StoreProcessor,
com.dexels.kafka.streams.remotejoin.PreJoinProcessor and
com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedProcessor
implementations I cannot tell if it is related to your issue.

If you believe they are different, do you mind sharing the full chunk of
your topology building code with the above three classes?



Guozhang


On Wed, Jun 20, 2018 at 5:25 AM, Frank Lyaruu <fl...@gmail.com> wrote:

> I didn't get much further. When I run with the 1.1.0 release version the
> stacktrace looks slightly different, but still a very similar NPE, after
> the same amount of time.
> One observation is that I use a few different processors, and it seems
> random which one gets caught in the stack trace.
>
> I've put the description of the topology in another gist:
>
> https://gist.github.com/flyaruu/39fba78aec562ae5d6f11d3add6a0881
>
> I've captured the last second or so before the NPE at debug level here:
>
> https://gist.github.com/flyaruu/43d31de10e03af160b20e9534f13830e
>
> I've increased the heap size (in case of a silent OOM exception), doesn't
> seem to matter
>
> I'm kinda out of ideas.
>
>
>
> On Tue, Jun 19, 2018 at 11:02 AM Frank Lyaruu <fl...@gmail.com> wrote:
>
> > We've tried running a fresh version with yesterday morning's trunk
> > version, with the same result.
> > We're running +- 15 KafkaStreams instances, and the one that fails is
> ithe
> > biggest one, with >150 processors.
> > We haven't been able to reproduce this error with smaller sub-sets.
> >
> > I'm now going to try this with the Kafka 1.1.0 release version.
> >
> > regards, Frank
> >
> > On Tue, Jun 19, 2018 at 1:18 AM Guozhang Wang <wa...@gmail.com>
> wrote:
> >
> >> Hello Frank,
> >>
> >> Your OneToManyGroupedProcessor.java looks fine to me.
> >>
> >> Is it consistently re-producible? What if you restart from fresh using
> the
> >> trunk version?
> >>
> >>
> >> Guozhang
> >>
> >> On Mon, Jun 18, 2018 at 9:03 AM, Frank Lyaruu <fl...@gmail.com>
> wrote:
> >>
> >> > Yes, here it is:
> >> >
> >> > https://gist.github.com/flyaruu/84b65d52a01987946b07d21bafe0d5f1
> >> >
> >> > It ran completely fine for the last year (and still does), it just
> does
> >> not
> >> > seem to enjoy the upgrade of Kafka Streams.
> >> >
> >> > regards, Frank
> >> >
> >> > On Mon, Jun 18, 2018 at 4:49 PM Ted Yu <yu...@gmail.com> wrote:
> >> >
> >> > > Can you show the related code from OneToManyGroupedProcessor ?
> >> > >
> >> > > Thanks
> >> > >
> >> > > On Mon, Jun 18, 2018 at 4:29 AM, Frank Lyaruu <fl...@gmail.com>
> >> wrote:
> >> > >
> >> > > > Hi, I've upgraded our 0.11 based stream application to the trunk
> >> > version,
> >> > > > and I get an intermittent NPE. It's is quite a big topology, and I
> >> > > haven't
> >> > > > succeeded in reproducing it on a simpler topology.
> >> > > > It builds the topology, starts Kafka Streams, runs for about 20s.,
> >> and
> >> > > then
> >> > > > it terminates
> >> > > > It seems that the 'currentNode' in the ProcessorContext is null.
> >> > > >
> >> > > > Does this ring a bell for anyone?
> >> > > >
> >> > > > [lowlevel-develop-generation-7aa-lowlevel-e23137ae-d94b-
> >> > > > 4f17-a684-995320fd426d-StreamThread-12]
> >> > > > ERROR
> >> org.apache.kafka.streams.processor.internals.AssignedStreamsTasks
> >> > -
> >> > > > stream-thread
> >> > > > [lowlevel-develop-generation-7aa-lowlevel-e23137ae-d94b-
> >> > > > 4f17-a684-995320fd426d-StreamThread-12]
> >> > > > Failed to process stream task 0_0 due to the following error:
> >> > > > java.lang.NullPointerException
> >> > > >     at
> >> > > >
> >> > > org.apache.kafka.streams.processor.internals.
> >> > ProcessorContextImpl.forward(
> >> > > > ProcessorContextImpl.java:114)
> >> > > >     at
> >> > > >
> >> > > org.apache.kafka.streams.processor.internals.
> >> > ProcessorContextImpl.forward(
> >> > > > ProcessorContextImpl.java:90)
> >> > > >     at
> >> > > >
> >> com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedProcessor.
> >> > > > forwardMessage(OneToManyGroupedProcessor.java:125)
> >> > > >     at
> >> > > >
> >> com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedProcessor.
> >> > > > forwardJoin(OneToManyGroupedProcessor.java:101)
> >> > > >     at
> >> > > >
> >> com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedProcessor.
> >> > > > process(OneToManyGroupedProcessor.java:70)
> >> > > >     at
> >> > > >
> >> com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedProcessor.
> >> > > > process(OneToManyGroupedProcessor.java:1)
> >> > > >     at
> >> > > > org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(
> >> > > > ProcessorNode.java:50)
> >> > > >     at
> >> > > > org.apache.kafka.streams.processor.internals.ProcessorNode.
> >> > > > runAndMeasureLatency(ProcessorNode.java:244)
> >> > > >     at
> >> > > > org.apache.kafka.streams.processor.internals.
> ProcessorNode.process(
> >> > > > ProcessorNode.java:133)
> >> > > >     at
> >> > > >
> >> > > org.apache.kafka.streams.processor.internals.
> >> > ProcessorContextImpl.forward(
> >> > > > ProcessorContextImpl.java:143)
> >> > > >     at
> >> > > >
> >> > > org.apache.kafka.streams.processor.internals.
> >> > ProcessorContextImpl.forward(
> >> > > > ProcessorContextImpl.java:126)
> >> > > >     at
> >> > > >
> >> > > org.apache.kafka.streams.processor.internals.
> >> > ProcessorContextImpl.forward(
> >> > > > ProcessorContextImpl.java:90)
> >> > > >     at
> >> > > > com.dexels.kafka.streams.remotejoin.PreJoinProcessor.
> >> > > > process(PreJoinProcessor.java:25)
> >> > > >     at
> >> > > > com.dexels.kafka.streams.remotejoin.PreJoinProcessor.
> >> > > > process(PreJoinProcessor.java:1)
> >> > > >     at
> >> > > > org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(
> >> > > > ProcessorNode.java:50)
> >> > > >     at
> >> > > > org.apache.kafka.streams.processor.internals.ProcessorNode.
> >> > > > runAndMeasureLatency(ProcessorNode.java:244)
> >> > > >     at
> >> > > > org.apache.kafka.streams.processor.internals.
> ProcessorNode.process(
> >> > > > ProcessorNode.java:133)
> >> > > >     at
> >> > > >
> >> > > org.apache.kafka.streams.processor.internals.
> >> > ProcessorContextImpl.forward(
> >> > > > ProcessorContextImpl.java:143)
> >> > > >     at
> >> > > >
> >> > > org.apache.kafka.streams.processor.internals.
> >> > ProcessorContextImpl.forward(
> >> > > > ProcessorContextImpl.java:126)
> >> > > >     at
> >> > > >
> >> > > org.apache.kafka.streams.processor.internals.
> >> > ProcessorContextImpl.forward(
> >> > > > ProcessorContextImpl.java:90)
> >> > > >     at
> >> > > > com.dexels.kafka.streams.remotejoin.StoreProcessor.
> >> > > > process(StoreProcessor.java:48)
> >> > > >     at
> >> > > > com.dexels.kafka.streams.remotejoin.StoreProcessor.
> >> > > > process(StoreProcessor.java:1)
> >> > > >     at
> >> > > > org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(
> >> > > > ProcessorNode.java:50)
> >> > > >     at
> >> > > > org.apache.kafka.streams.processor.internals.ProcessorNode.
> >> > > > runAndMeasureLatency(ProcessorNode.java:244)
> >> > > >     at
> >> > > > org.apache.kafka.streams.processor.internals.
> ProcessorNode.process(
> >> > > > ProcessorNode.java:133)
> >> > > >     at
> >> > > >
> >> > > org.apache.kafka.streams.processor.internals.
> >> > ProcessorContextImpl.forward(
> >> > > > ProcessorContextImpl.java:143)
> >> > > >     at
> >> > > >
> >> > > org.apache.kafka.streams.processor.internals.
> >> > ProcessorContextImpl.forward(
> >> > > > ProcessorContextImpl.java:126)
> >> > > >     at
> >> > > >
> >> > > org.apache.kafka.streams.processor.internals.
> >> > ProcessorContextImpl.forward(
> >> > > > ProcessorContextImpl.java:90)
> >> > > >     at
> >> > > > com.dexels.kafka.streams.remotejoin.XmlTransformerProcessor.
> process(
> >> > > > XmlTransformerProcessor.java:52)
> >> > > >     at
> >> > > > com.dexels.kafka.streams.remotejoin.XmlTransformerProcessor.
> process(
> >> > > > XmlTransformerProcessor.java:1)
> >> > > >     at
> >> > > > org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(
> >> > > > ProcessorNode.java:50)
> >> > > >     at
> >> > > > org.apache.kafka.streams.processor.internals.ProcessorNode.
> >> > > > runAndMeasureLatency(ProcessorNode.java:244)
> >> > > >     at
> >> > > > org.apache.kafka.streams.processor.internals.
> ProcessorNode.process(
> >> > > > ProcessorNode.java:133)
> >> > > >     at
> >> > > >
> >> > > org.apache.kafka.streams.processor.internals.
> >> > ProcessorContextImpl.forward(
> >> > > > ProcessorContextImpl.java:143)
> >> > > >     at
> >> > > >
> >> > > org.apache.kafka.streams.processor.internals.
> >> > ProcessorContextImpl.forward(
> >> > > > ProcessorContextImpl.java:126)
> >> > > >     at
> >> > > >
> >> > > org.apache.kafka.streams.processor.internals.
> >> > ProcessorContextImpl.forward(
> >> > > > ProcessorContextImpl.java:90)
> >> > > >     at
> >> > > > org.apache.kafka.streams.processor.internals.
> >> > > > SourceNode.process(SourceNode.java:87)
> >> > > >     at
> >> > > > org.apache.kafka.streams.processor.internals.
> >> > > > StreamTask.process(StreamTask.java:288)
> >> > > >     at
> >> > > >
> >> > > org.apache.kafka.streams.processor.internals.
> >> > AssignedStreamsTasks.process(
> >> > > > AssignedStreamsTasks.java:94)
> >> > > >     at
> >> > > > org.apache.kafka.streams.processor.internals.TaskManager.process(
> >> > > > TaskManager.java:409)
> >> > > >     at
> >> > > > org.apache.kafka.streams.processor.internals.StreamThread.
> >> > > > processAndMaybeCommit(StreamThread.java:952)
> >> > > >     at
> >> > > > org.apache.kafka.streams.processor.internals.
> StreamThread.runOnce(
> >> > > > StreamThread.java:827)
> >> > > >     at
> >> > > > org.apache.kafka.streams.processor.internals.
> StreamThread.runLoop(
> >> > > > StreamThread.java:767)
> >> > > >     at
> >> > > > org.apache.kafka.streams.processor.internals.
> >> > > > StreamThread.run(StreamThread.java:736)
> >> > > >
> >> > >
> >> >
> >>
> >>
> >>
> >> --
> >> -- Guozhang
> >>
> >
>



-- 
-- Guozhang

Re: NPE in low level Kafka topology

Posted by Frank Lyaruu <fl...@gmail.com>.
I didn't get much further. When I run with the 1.1.0 release version the
stacktrace looks slightly different, but still a very similar NPE, after
the same amount of time.
One observation is that I use a few different processors, and it seems
random which one gets caught in the stack trace.

I've put the description of the topology in another gist:

https://gist.github.com/flyaruu/39fba78aec562ae5d6f11d3add6a0881

I've captured the last second or so before the NPE at debug level here:

https://gist.github.com/flyaruu/43d31de10e03af160b20e9534f13830e

I've increased the heap size (in case of a silent OOM exception), doesn't
seem to matter

I'm kinda out of ideas.



On Tue, Jun 19, 2018 at 11:02 AM Frank Lyaruu <fl...@gmail.com> wrote:

> We've tried running a fresh version with yesterday morning's trunk
> version, with the same result.
> We're running +- 15 KafkaStreams instances, and the one that fails is ithe
> biggest one, with >150 processors.
> We haven't been able to reproduce this error with smaller sub-sets.
>
> I'm now going to try this with the Kafka 1.1.0 release version.
>
> regards, Frank
>
> On Tue, Jun 19, 2018 at 1:18 AM Guozhang Wang <wa...@gmail.com> wrote:
>
>> Hello Frank,
>>
>> Your OneToManyGroupedProcessor.java looks fine to me.
>>
>> Is it consistently re-producible? What if you restart from fresh using the
>> trunk version?
>>
>>
>> Guozhang
>>
>> On Mon, Jun 18, 2018 at 9:03 AM, Frank Lyaruu <fl...@gmail.com> wrote:
>>
>> > Yes, here it is:
>> >
>> > https://gist.github.com/flyaruu/84b65d52a01987946b07d21bafe0d5f1
>> >
>> > It ran completely fine for the last year (and still does), it just does
>> not
>> > seem to enjoy the upgrade of Kafka Streams.
>> >
>> > regards, Frank
>> >
>> > On Mon, Jun 18, 2018 at 4:49 PM Ted Yu <yu...@gmail.com> wrote:
>> >
>> > > Can you show the related code from OneToManyGroupedProcessor ?
>> > >
>> > > Thanks
>> > >
>> > > On Mon, Jun 18, 2018 at 4:29 AM, Frank Lyaruu <fl...@gmail.com>
>> wrote:
>> > >
>> > > > Hi, I've upgraded our 0.11 based stream application to the trunk
>> > version,
>> > > > and I get an intermittent NPE. It's is quite a big topology, and I
>> > > haven't
>> > > > succeeded in reproducing it on a simpler topology.
>> > > > It builds the topology, starts Kafka Streams, runs for about 20s.,
>> and
>> > > then
>> > > > it terminates
>> > > > It seems that the 'currentNode' in the ProcessorContext is null.
>> > > >
>> > > > Does this ring a bell for anyone?
>> > > >
>> > > > [lowlevel-develop-generation-7aa-lowlevel-e23137ae-d94b-
>> > > > 4f17-a684-995320fd426d-StreamThread-12]
>> > > > ERROR
>> org.apache.kafka.streams.processor.internals.AssignedStreamsTasks
>> > -
>> > > > stream-thread
>> > > > [lowlevel-develop-generation-7aa-lowlevel-e23137ae-d94b-
>> > > > 4f17-a684-995320fd426d-StreamThread-12]
>> > > > Failed to process stream task 0_0 due to the following error:
>> > > > java.lang.NullPointerException
>> > > >     at
>> > > >
>> > > org.apache.kafka.streams.processor.internals.
>> > ProcessorContextImpl.forward(
>> > > > ProcessorContextImpl.java:114)
>> > > >     at
>> > > >
>> > > org.apache.kafka.streams.processor.internals.
>> > ProcessorContextImpl.forward(
>> > > > ProcessorContextImpl.java:90)
>> > > >     at
>> > > >
>> com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedProcessor.
>> > > > forwardMessage(OneToManyGroupedProcessor.java:125)
>> > > >     at
>> > > >
>> com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedProcessor.
>> > > > forwardJoin(OneToManyGroupedProcessor.java:101)
>> > > >     at
>> > > >
>> com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedProcessor.
>> > > > process(OneToManyGroupedProcessor.java:70)
>> > > >     at
>> > > >
>> com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedProcessor.
>> > > > process(OneToManyGroupedProcessor.java:1)
>> > > >     at
>> > > > org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(
>> > > > ProcessorNode.java:50)
>> > > >     at
>> > > > org.apache.kafka.streams.processor.internals.ProcessorNode.
>> > > > runAndMeasureLatency(ProcessorNode.java:244)
>> > > >     at
>> > > > org.apache.kafka.streams.processor.internals.ProcessorNode.process(
>> > > > ProcessorNode.java:133)
>> > > >     at
>> > > >
>> > > org.apache.kafka.streams.processor.internals.
>> > ProcessorContextImpl.forward(
>> > > > ProcessorContextImpl.java:143)
>> > > >     at
>> > > >
>> > > org.apache.kafka.streams.processor.internals.
>> > ProcessorContextImpl.forward(
>> > > > ProcessorContextImpl.java:126)
>> > > >     at
>> > > >
>> > > org.apache.kafka.streams.processor.internals.
>> > ProcessorContextImpl.forward(
>> > > > ProcessorContextImpl.java:90)
>> > > >     at
>> > > > com.dexels.kafka.streams.remotejoin.PreJoinProcessor.
>> > > > process(PreJoinProcessor.java:25)
>> > > >     at
>> > > > com.dexels.kafka.streams.remotejoin.PreJoinProcessor.
>> > > > process(PreJoinProcessor.java:1)
>> > > >     at
>> > > > org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(
>> > > > ProcessorNode.java:50)
>> > > >     at
>> > > > org.apache.kafka.streams.processor.internals.ProcessorNode.
>> > > > runAndMeasureLatency(ProcessorNode.java:244)
>> > > >     at
>> > > > org.apache.kafka.streams.processor.internals.ProcessorNode.process(
>> > > > ProcessorNode.java:133)
>> > > >     at
>> > > >
>> > > org.apache.kafka.streams.processor.internals.
>> > ProcessorContextImpl.forward(
>> > > > ProcessorContextImpl.java:143)
>> > > >     at
>> > > >
>> > > org.apache.kafka.streams.processor.internals.
>> > ProcessorContextImpl.forward(
>> > > > ProcessorContextImpl.java:126)
>> > > >     at
>> > > >
>> > > org.apache.kafka.streams.processor.internals.
>> > ProcessorContextImpl.forward(
>> > > > ProcessorContextImpl.java:90)
>> > > >     at
>> > > > com.dexels.kafka.streams.remotejoin.StoreProcessor.
>> > > > process(StoreProcessor.java:48)
>> > > >     at
>> > > > com.dexels.kafka.streams.remotejoin.StoreProcessor.
>> > > > process(StoreProcessor.java:1)
>> > > >     at
>> > > > org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(
>> > > > ProcessorNode.java:50)
>> > > >     at
>> > > > org.apache.kafka.streams.processor.internals.ProcessorNode.
>> > > > runAndMeasureLatency(ProcessorNode.java:244)
>> > > >     at
>> > > > org.apache.kafka.streams.processor.internals.ProcessorNode.process(
>> > > > ProcessorNode.java:133)
>> > > >     at
>> > > >
>> > > org.apache.kafka.streams.processor.internals.
>> > ProcessorContextImpl.forward(
>> > > > ProcessorContextImpl.java:143)
>> > > >     at
>> > > >
>> > > org.apache.kafka.streams.processor.internals.
>> > ProcessorContextImpl.forward(
>> > > > ProcessorContextImpl.java:126)
>> > > >     at
>> > > >
>> > > org.apache.kafka.streams.processor.internals.
>> > ProcessorContextImpl.forward(
>> > > > ProcessorContextImpl.java:90)
>> > > >     at
>> > > > com.dexels.kafka.streams.remotejoin.XmlTransformerProcessor.process(
>> > > > XmlTransformerProcessor.java:52)
>> > > >     at
>> > > > com.dexels.kafka.streams.remotejoin.XmlTransformerProcessor.process(
>> > > > XmlTransformerProcessor.java:1)
>> > > >     at
>> > > > org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(
>> > > > ProcessorNode.java:50)
>> > > >     at
>> > > > org.apache.kafka.streams.processor.internals.ProcessorNode.
>> > > > runAndMeasureLatency(ProcessorNode.java:244)
>> > > >     at
>> > > > org.apache.kafka.streams.processor.internals.ProcessorNode.process(
>> > > > ProcessorNode.java:133)
>> > > >     at
>> > > >
>> > > org.apache.kafka.streams.processor.internals.
>> > ProcessorContextImpl.forward(
>> > > > ProcessorContextImpl.java:143)
>> > > >     at
>> > > >
>> > > org.apache.kafka.streams.processor.internals.
>> > ProcessorContextImpl.forward(
>> > > > ProcessorContextImpl.java:126)
>> > > >     at
>> > > >
>> > > org.apache.kafka.streams.processor.internals.
>> > ProcessorContextImpl.forward(
>> > > > ProcessorContextImpl.java:90)
>> > > >     at
>> > > > org.apache.kafka.streams.processor.internals.
>> > > > SourceNode.process(SourceNode.java:87)
>> > > >     at
>> > > > org.apache.kafka.streams.processor.internals.
>> > > > StreamTask.process(StreamTask.java:288)
>> > > >     at
>> > > >
>> > > org.apache.kafka.streams.processor.internals.
>> > AssignedStreamsTasks.process(
>> > > > AssignedStreamsTasks.java:94)
>> > > >     at
>> > > > org.apache.kafka.streams.processor.internals.TaskManager.process(
>> > > > TaskManager.java:409)
>> > > >     at
>> > > > org.apache.kafka.streams.processor.internals.StreamThread.
>> > > > processAndMaybeCommit(StreamThread.java:952)
>> > > >     at
>> > > > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(
>> > > > StreamThread.java:827)
>> > > >     at
>> > > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
>> > > > StreamThread.java:767)
>> > > >     at
>> > > > org.apache.kafka.streams.processor.internals.
>> > > > StreamThread.run(StreamThread.java:736)
>> > > >
>> > >
>> >
>>
>>
>>
>> --
>> -- Guozhang
>>
>

Re: NPE in low level Kafka topology

Posted by Frank Lyaruu <fl...@gmail.com>.
We've tried running a fresh version with yesterday morning's trunk version,
with the same result.
We're running +- 15 KafkaStreams instances, and the one that fails is ithe
biggest one, with >150 processors.
We haven't been able to reproduce this error with smaller sub-sets.

I'm now going to try this with the Kafka 1.1.0 release version.

regards, Frank

On Tue, Jun 19, 2018 at 1:18 AM Guozhang Wang <wa...@gmail.com> wrote:

> Hello Frank,
>
> Your OneToManyGroupedProcessor.java looks fine to me.
>
> Is it consistently re-producible? What if you restart from fresh using the
> trunk version?
>
>
> Guozhang
>
> On Mon, Jun 18, 2018 at 9:03 AM, Frank Lyaruu <fl...@gmail.com> wrote:
>
> > Yes, here it is:
> >
> > https://gist.github.com/flyaruu/84b65d52a01987946b07d21bafe0d5f1
> >
> > It ran completely fine for the last year (and still does), it just does
> not
> > seem to enjoy the upgrade of Kafka Streams.
> >
> > regards, Frank
> >
> > On Mon, Jun 18, 2018 at 4:49 PM Ted Yu <yu...@gmail.com> wrote:
> >
> > > Can you show the related code from OneToManyGroupedProcessor ?
> > >
> > > Thanks
> > >
> > > On Mon, Jun 18, 2018 at 4:29 AM, Frank Lyaruu <fl...@gmail.com>
> wrote:
> > >
> > > > Hi, I've upgraded our 0.11 based stream application to the trunk
> > version,
> > > > and I get an intermittent NPE. It's is quite a big topology, and I
> > > haven't
> > > > succeeded in reproducing it on a simpler topology.
> > > > It builds the topology, starts Kafka Streams, runs for about 20s.,
> and
> > > then
> > > > it terminates
> > > > It seems that the 'currentNode' in the ProcessorContext is null.
> > > >
> > > > Does this ring a bell for anyone?
> > > >
> > > > [lowlevel-develop-generation-7aa-lowlevel-e23137ae-d94b-
> > > > 4f17-a684-995320fd426d-StreamThread-12]
> > > > ERROR
> org.apache.kafka.streams.processor.internals.AssignedStreamsTasks
> > -
> > > > stream-thread
> > > > [lowlevel-develop-generation-7aa-lowlevel-e23137ae-d94b-
> > > > 4f17-a684-995320fd426d-StreamThread-12]
> > > > Failed to process stream task 0_0 due to the following error:
> > > > java.lang.NullPointerException
> > > >     at
> > > >
> > > org.apache.kafka.streams.processor.internals.
> > ProcessorContextImpl.forward(
> > > > ProcessorContextImpl.java:114)
> > > >     at
> > > >
> > > org.apache.kafka.streams.processor.internals.
> > ProcessorContextImpl.forward(
> > > > ProcessorContextImpl.java:90)
> > > >     at
> > > > com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedProcessor.
> > > > forwardMessage(OneToManyGroupedProcessor.java:125)
> > > >     at
> > > > com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedProcessor.
> > > > forwardJoin(OneToManyGroupedProcessor.java:101)
> > > >     at
> > > > com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedProcessor.
> > > > process(OneToManyGroupedProcessor.java:70)
> > > >     at
> > > > com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedProcessor.
> > > > process(OneToManyGroupedProcessor.java:1)
> > > >     at
> > > > org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(
> > > > ProcessorNode.java:50)
> > > >     at
> > > > org.apache.kafka.streams.processor.internals.ProcessorNode.
> > > > runAndMeasureLatency(ProcessorNode.java:244)
> > > >     at
> > > > org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> > > > ProcessorNode.java:133)
> > > >     at
> > > >
> > > org.apache.kafka.streams.processor.internals.
> > ProcessorContextImpl.forward(
> > > > ProcessorContextImpl.java:143)
> > > >     at
> > > >
> > > org.apache.kafka.streams.processor.internals.
> > ProcessorContextImpl.forward(
> > > > ProcessorContextImpl.java:126)
> > > >     at
> > > >
> > > org.apache.kafka.streams.processor.internals.
> > ProcessorContextImpl.forward(
> > > > ProcessorContextImpl.java:90)
> > > >     at
> > > > com.dexels.kafka.streams.remotejoin.PreJoinProcessor.
> > > > process(PreJoinProcessor.java:25)
> > > >     at
> > > > com.dexels.kafka.streams.remotejoin.PreJoinProcessor.
> > > > process(PreJoinProcessor.java:1)
> > > >     at
> > > > org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(
> > > > ProcessorNode.java:50)
> > > >     at
> > > > org.apache.kafka.streams.processor.internals.ProcessorNode.
> > > > runAndMeasureLatency(ProcessorNode.java:244)
> > > >     at
> > > > org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> > > > ProcessorNode.java:133)
> > > >     at
> > > >
> > > org.apache.kafka.streams.processor.internals.
> > ProcessorContextImpl.forward(
> > > > ProcessorContextImpl.java:143)
> > > >     at
> > > >
> > > org.apache.kafka.streams.processor.internals.
> > ProcessorContextImpl.forward(
> > > > ProcessorContextImpl.java:126)
> > > >     at
> > > >
> > > org.apache.kafka.streams.processor.internals.
> > ProcessorContextImpl.forward(
> > > > ProcessorContextImpl.java:90)
> > > >     at
> > > > com.dexels.kafka.streams.remotejoin.StoreProcessor.
> > > > process(StoreProcessor.java:48)
> > > >     at
> > > > com.dexels.kafka.streams.remotejoin.StoreProcessor.
> > > > process(StoreProcessor.java:1)
> > > >     at
> > > > org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(
> > > > ProcessorNode.java:50)
> > > >     at
> > > > org.apache.kafka.streams.processor.internals.ProcessorNode.
> > > > runAndMeasureLatency(ProcessorNode.java:244)
> > > >     at
> > > > org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> > > > ProcessorNode.java:133)
> > > >     at
> > > >
> > > org.apache.kafka.streams.processor.internals.
> > ProcessorContextImpl.forward(
> > > > ProcessorContextImpl.java:143)
> > > >     at
> > > >
> > > org.apache.kafka.streams.processor.internals.
> > ProcessorContextImpl.forward(
> > > > ProcessorContextImpl.java:126)
> > > >     at
> > > >
> > > org.apache.kafka.streams.processor.internals.
> > ProcessorContextImpl.forward(
> > > > ProcessorContextImpl.java:90)
> > > >     at
> > > > com.dexels.kafka.streams.remotejoin.XmlTransformerProcessor.process(
> > > > XmlTransformerProcessor.java:52)
> > > >     at
> > > > com.dexels.kafka.streams.remotejoin.XmlTransformerProcessor.process(
> > > > XmlTransformerProcessor.java:1)
> > > >     at
> > > > org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(
> > > > ProcessorNode.java:50)
> > > >     at
> > > > org.apache.kafka.streams.processor.internals.ProcessorNode.
> > > > runAndMeasureLatency(ProcessorNode.java:244)
> > > >     at
> > > > org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> > > > ProcessorNode.java:133)
> > > >     at
> > > >
> > > org.apache.kafka.streams.processor.internals.
> > ProcessorContextImpl.forward(
> > > > ProcessorContextImpl.java:143)
> > > >     at
> > > >
> > > org.apache.kafka.streams.processor.internals.
> > ProcessorContextImpl.forward(
> > > > ProcessorContextImpl.java:126)
> > > >     at
> > > >
> > > org.apache.kafka.streams.processor.internals.
> > ProcessorContextImpl.forward(
> > > > ProcessorContextImpl.java:90)
> > > >     at
> > > > org.apache.kafka.streams.processor.internals.
> > > > SourceNode.process(SourceNode.java:87)
> > > >     at
> > > > org.apache.kafka.streams.processor.internals.
> > > > StreamTask.process(StreamTask.java:288)
> > > >     at
> > > >
> > > org.apache.kafka.streams.processor.internals.
> > AssignedStreamsTasks.process(
> > > > AssignedStreamsTasks.java:94)
> > > >     at
> > > > org.apache.kafka.streams.processor.internals.TaskManager.process(
> > > > TaskManager.java:409)
> > > >     at
> > > > org.apache.kafka.streams.processor.internals.StreamThread.
> > > > processAndMaybeCommit(StreamThread.java:952)
> > > >     at
> > > > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(
> > > > StreamThread.java:827)
> > > >     at
> > > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> > > > StreamThread.java:767)
> > > >     at
> > > > org.apache.kafka.streams.processor.internals.
> > > > StreamThread.run(StreamThread.java:736)
> > > >
> > >
> >
>
>
>
> --
> -- Guozhang
>

Re: NPE in low level Kafka topology

Posted by Guozhang Wang <wa...@gmail.com>.
Hello Frank,

Your OneToManyGroupedProcessor.java looks fine to me.

Is it consistently re-producible? What if you restart from fresh using the
trunk version?


Guozhang

On Mon, Jun 18, 2018 at 9:03 AM, Frank Lyaruu <fl...@gmail.com> wrote:

> Yes, here it is:
>
> https://gist.github.com/flyaruu/84b65d52a01987946b07d21bafe0d5f1
>
> It ran completely fine for the last year (and still does), it just does not
> seem to enjoy the upgrade of Kafka Streams.
>
> regards, Frank
>
> On Mon, Jun 18, 2018 at 4:49 PM Ted Yu <yu...@gmail.com> wrote:
>
> > Can you show the related code from OneToManyGroupedProcessor ?
> >
> > Thanks
> >
> > On Mon, Jun 18, 2018 at 4:29 AM, Frank Lyaruu <fl...@gmail.com> wrote:
> >
> > > Hi, I've upgraded our 0.11 based stream application to the trunk
> version,
> > > and I get an intermittent NPE. It's is quite a big topology, and I
> > haven't
> > > succeeded in reproducing it on a simpler topology.
> > > It builds the topology, starts Kafka Streams, runs for about 20s., and
> > then
> > > it terminates
> > > It seems that the 'currentNode' in the ProcessorContext is null.
> > >
> > > Does this ring a bell for anyone?
> > >
> > > [lowlevel-develop-generation-7aa-lowlevel-e23137ae-d94b-
> > > 4f17-a684-995320fd426d-StreamThread-12]
> > > ERROR org.apache.kafka.streams.processor.internals.AssignedStreamsTasks
> -
> > > stream-thread
> > > [lowlevel-develop-generation-7aa-lowlevel-e23137ae-d94b-
> > > 4f17-a684-995320fd426d-StreamThread-12]
> > > Failed to process stream task 0_0 due to the following error:
> > > java.lang.NullPointerException
> > >     at
> > >
> > org.apache.kafka.streams.processor.internals.
> ProcessorContextImpl.forward(
> > > ProcessorContextImpl.java:114)
> > >     at
> > >
> > org.apache.kafka.streams.processor.internals.
> ProcessorContextImpl.forward(
> > > ProcessorContextImpl.java:90)
> > >     at
> > > com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedProcessor.
> > > forwardMessage(OneToManyGroupedProcessor.java:125)
> > >     at
> > > com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedProcessor.
> > > forwardJoin(OneToManyGroupedProcessor.java:101)
> > >     at
> > > com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedProcessor.
> > > process(OneToManyGroupedProcessor.java:70)
> > >     at
> > > com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedProcessor.
> > > process(OneToManyGroupedProcessor.java:1)
> > >     at
> > > org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(
> > > ProcessorNode.java:50)
> > >     at
> > > org.apache.kafka.streams.processor.internals.ProcessorNode.
> > > runAndMeasureLatency(ProcessorNode.java:244)
> > >     at
> > > org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> > > ProcessorNode.java:133)
> > >     at
> > >
> > org.apache.kafka.streams.processor.internals.
> ProcessorContextImpl.forward(
> > > ProcessorContextImpl.java:143)
> > >     at
> > >
> > org.apache.kafka.streams.processor.internals.
> ProcessorContextImpl.forward(
> > > ProcessorContextImpl.java:126)
> > >     at
> > >
> > org.apache.kafka.streams.processor.internals.
> ProcessorContextImpl.forward(
> > > ProcessorContextImpl.java:90)
> > >     at
> > > com.dexels.kafka.streams.remotejoin.PreJoinProcessor.
> > > process(PreJoinProcessor.java:25)
> > >     at
> > > com.dexels.kafka.streams.remotejoin.PreJoinProcessor.
> > > process(PreJoinProcessor.java:1)
> > >     at
> > > org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(
> > > ProcessorNode.java:50)
> > >     at
> > > org.apache.kafka.streams.processor.internals.ProcessorNode.
> > > runAndMeasureLatency(ProcessorNode.java:244)
> > >     at
> > > org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> > > ProcessorNode.java:133)
> > >     at
> > >
> > org.apache.kafka.streams.processor.internals.
> ProcessorContextImpl.forward(
> > > ProcessorContextImpl.java:143)
> > >     at
> > >
> > org.apache.kafka.streams.processor.internals.
> ProcessorContextImpl.forward(
> > > ProcessorContextImpl.java:126)
> > >     at
> > >
> > org.apache.kafka.streams.processor.internals.
> ProcessorContextImpl.forward(
> > > ProcessorContextImpl.java:90)
> > >     at
> > > com.dexels.kafka.streams.remotejoin.StoreProcessor.
> > > process(StoreProcessor.java:48)
> > >     at
> > > com.dexels.kafka.streams.remotejoin.StoreProcessor.
> > > process(StoreProcessor.java:1)
> > >     at
> > > org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(
> > > ProcessorNode.java:50)
> > >     at
> > > org.apache.kafka.streams.processor.internals.ProcessorNode.
> > > runAndMeasureLatency(ProcessorNode.java:244)
> > >     at
> > > org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> > > ProcessorNode.java:133)
> > >     at
> > >
> > org.apache.kafka.streams.processor.internals.
> ProcessorContextImpl.forward(
> > > ProcessorContextImpl.java:143)
> > >     at
> > >
> > org.apache.kafka.streams.processor.internals.
> ProcessorContextImpl.forward(
> > > ProcessorContextImpl.java:126)
> > >     at
> > >
> > org.apache.kafka.streams.processor.internals.
> ProcessorContextImpl.forward(
> > > ProcessorContextImpl.java:90)
> > >     at
> > > com.dexels.kafka.streams.remotejoin.XmlTransformerProcessor.process(
> > > XmlTransformerProcessor.java:52)
> > >     at
> > > com.dexels.kafka.streams.remotejoin.XmlTransformerProcessor.process(
> > > XmlTransformerProcessor.java:1)
> > >     at
> > > org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(
> > > ProcessorNode.java:50)
> > >     at
> > > org.apache.kafka.streams.processor.internals.ProcessorNode.
> > > runAndMeasureLatency(ProcessorNode.java:244)
> > >     at
> > > org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> > > ProcessorNode.java:133)
> > >     at
> > >
> > org.apache.kafka.streams.processor.internals.
> ProcessorContextImpl.forward(
> > > ProcessorContextImpl.java:143)
> > >     at
> > >
> > org.apache.kafka.streams.processor.internals.
> ProcessorContextImpl.forward(
> > > ProcessorContextImpl.java:126)
> > >     at
> > >
> > org.apache.kafka.streams.processor.internals.
> ProcessorContextImpl.forward(
> > > ProcessorContextImpl.java:90)
> > >     at
> > > org.apache.kafka.streams.processor.internals.
> > > SourceNode.process(SourceNode.java:87)
> > >     at
> > > org.apache.kafka.streams.processor.internals.
> > > StreamTask.process(StreamTask.java:288)
> > >     at
> > >
> > org.apache.kafka.streams.processor.internals.
> AssignedStreamsTasks.process(
> > > AssignedStreamsTasks.java:94)
> > >     at
> > > org.apache.kafka.streams.processor.internals.TaskManager.process(
> > > TaskManager.java:409)
> > >     at
> > > org.apache.kafka.streams.processor.internals.StreamThread.
> > > processAndMaybeCommit(StreamThread.java:952)
> > >     at
> > > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(
> > > StreamThread.java:827)
> > >     at
> > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> > > StreamThread.java:767)
> > >     at
> > > org.apache.kafka.streams.processor.internals.
> > > StreamThread.run(StreamThread.java:736)
> > >
> >
>



-- 
-- Guozhang

Re: NPE in low level Kafka topology

Posted by Frank Lyaruu <fl...@gmail.com>.
Yes, here it is:

https://gist.github.com/flyaruu/84b65d52a01987946b07d21bafe0d5f1

It ran completely fine for the last year (and still does), it just does not
seem to enjoy the upgrade of Kafka Streams.

regards, Frank

On Mon, Jun 18, 2018 at 4:49 PM Ted Yu <yu...@gmail.com> wrote:

> Can you show the related code from OneToManyGroupedProcessor ?
>
> Thanks
>
> On Mon, Jun 18, 2018 at 4:29 AM, Frank Lyaruu <fl...@gmail.com> wrote:
>
> > Hi, I've upgraded our 0.11 based stream application to the trunk version,
> > and I get an intermittent NPE. It's is quite a big topology, and I
> haven't
> > succeeded in reproducing it on a simpler topology.
> > It builds the topology, starts Kafka Streams, runs for about 20s., and
> then
> > it terminates
> > It seems that the 'currentNode' in the ProcessorContext is null.
> >
> > Does this ring a bell for anyone?
> >
> > [lowlevel-develop-generation-7aa-lowlevel-e23137ae-d94b-
> > 4f17-a684-995320fd426d-StreamThread-12]
> > ERROR org.apache.kafka.streams.processor.internals.AssignedStreamsTasks -
> > stream-thread
> > [lowlevel-develop-generation-7aa-lowlevel-e23137ae-d94b-
> > 4f17-a684-995320fd426d-StreamThread-12]
> > Failed to process stream task 0_0 due to the following error:
> > java.lang.NullPointerException
> >     at
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(
> > ProcessorContextImpl.java:114)
> >     at
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(
> > ProcessorContextImpl.java:90)
> >     at
> > com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedProcessor.
> > forwardMessage(OneToManyGroupedProcessor.java:125)
> >     at
> > com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedProcessor.
> > forwardJoin(OneToManyGroupedProcessor.java:101)
> >     at
> > com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedProcessor.
> > process(OneToManyGroupedProcessor.java:70)
> >     at
> > com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedProcessor.
> > process(OneToManyGroupedProcessor.java:1)
> >     at
> > org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(
> > ProcessorNode.java:50)
> >     at
> > org.apache.kafka.streams.processor.internals.ProcessorNode.
> > runAndMeasureLatency(ProcessorNode.java:244)
> >     at
> > org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> > ProcessorNode.java:133)
> >     at
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(
> > ProcessorContextImpl.java:143)
> >     at
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(
> > ProcessorContextImpl.java:126)
> >     at
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(
> > ProcessorContextImpl.java:90)
> >     at
> > com.dexels.kafka.streams.remotejoin.PreJoinProcessor.
> > process(PreJoinProcessor.java:25)
> >     at
> > com.dexels.kafka.streams.remotejoin.PreJoinProcessor.
> > process(PreJoinProcessor.java:1)
> >     at
> > org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(
> > ProcessorNode.java:50)
> >     at
> > org.apache.kafka.streams.processor.internals.ProcessorNode.
> > runAndMeasureLatency(ProcessorNode.java:244)
> >     at
> > org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> > ProcessorNode.java:133)
> >     at
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(
> > ProcessorContextImpl.java:143)
> >     at
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(
> > ProcessorContextImpl.java:126)
> >     at
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(
> > ProcessorContextImpl.java:90)
> >     at
> > com.dexels.kafka.streams.remotejoin.StoreProcessor.
> > process(StoreProcessor.java:48)
> >     at
> > com.dexels.kafka.streams.remotejoin.StoreProcessor.
> > process(StoreProcessor.java:1)
> >     at
> > org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(
> > ProcessorNode.java:50)
> >     at
> > org.apache.kafka.streams.processor.internals.ProcessorNode.
> > runAndMeasureLatency(ProcessorNode.java:244)
> >     at
> > org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> > ProcessorNode.java:133)
> >     at
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(
> > ProcessorContextImpl.java:143)
> >     at
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(
> > ProcessorContextImpl.java:126)
> >     at
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(
> > ProcessorContextImpl.java:90)
> >     at
> > com.dexels.kafka.streams.remotejoin.XmlTransformerProcessor.process(
> > XmlTransformerProcessor.java:52)
> >     at
> > com.dexels.kafka.streams.remotejoin.XmlTransformerProcessor.process(
> > XmlTransformerProcessor.java:1)
> >     at
> > org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(
> > ProcessorNode.java:50)
> >     at
> > org.apache.kafka.streams.processor.internals.ProcessorNode.
> > runAndMeasureLatency(ProcessorNode.java:244)
> >     at
> > org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> > ProcessorNode.java:133)
> >     at
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(
> > ProcessorContextImpl.java:143)
> >     at
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(
> > ProcessorContextImpl.java:126)
> >     at
> >
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(
> > ProcessorContextImpl.java:90)
> >     at
> > org.apache.kafka.streams.processor.internals.
> > SourceNode.process(SourceNode.java:87)
> >     at
> > org.apache.kafka.streams.processor.internals.
> > StreamTask.process(StreamTask.java:288)
> >     at
> >
> org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(
> > AssignedStreamsTasks.java:94)
> >     at
> > org.apache.kafka.streams.processor.internals.TaskManager.process(
> > TaskManager.java:409)
> >     at
> > org.apache.kafka.streams.processor.internals.StreamThread.
> > processAndMaybeCommit(StreamThread.java:952)
> >     at
> > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(
> > StreamThread.java:827)
> >     at
> > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> > StreamThread.java:767)
> >     at
> > org.apache.kafka.streams.processor.internals.
> > StreamThread.run(StreamThread.java:736)
> >
>

Re: NPE in low level Kafka topology

Posted by Ted Yu <yu...@gmail.com>.
Can you show the related code from OneToManyGroupedProcessor ?

Thanks

On Mon, Jun 18, 2018 at 4:29 AM, Frank Lyaruu <fl...@gmail.com> wrote:

> Hi, I've upgraded our 0.11 based stream application to the trunk version,
> and I get an intermittent NPE. It's is quite a big topology, and I haven't
> succeeded in reproducing it on a simpler topology.
> It builds the topology, starts Kafka Streams, runs for about 20s., and then
> it terminates
> It seems that the 'currentNode' in the ProcessorContext is null.
>
> Does this ring a bell for anyone?
>
> [lowlevel-develop-generation-7aa-lowlevel-e23137ae-d94b-
> 4f17-a684-995320fd426d-StreamThread-12]
> ERROR org.apache.kafka.streams.processor.internals.AssignedStreamsTasks -
> stream-thread
> [lowlevel-develop-generation-7aa-lowlevel-e23137ae-d94b-
> 4f17-a684-995320fd426d-StreamThread-12]
> Failed to process stream task 0_0 due to the following error:
> java.lang.NullPointerException
>     at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(
> ProcessorContextImpl.java:114)
>     at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(
> ProcessorContextImpl.java:90)
>     at
> com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedProcessor.
> forwardMessage(OneToManyGroupedProcessor.java:125)
>     at
> com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedProcessor.
> forwardJoin(OneToManyGroupedProcessor.java:101)
>     at
> com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedProcessor.
> process(OneToManyGroupedProcessor.java:70)
>     at
> com.dexels.kafka.streams.remotejoin.ranged.OneToManyGroupedProcessor.
> process(OneToManyGroupedProcessor.java:1)
>     at
> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(
> ProcessorNode.java:50)
>     at
> org.apache.kafka.streams.processor.internals.ProcessorNode.
> runAndMeasureLatency(ProcessorNode.java:244)
>     at
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> ProcessorNode.java:133)
>     at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(
> ProcessorContextImpl.java:143)
>     at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(
> ProcessorContextImpl.java:126)
>     at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(
> ProcessorContextImpl.java:90)
>     at
> com.dexels.kafka.streams.remotejoin.PreJoinProcessor.
> process(PreJoinProcessor.java:25)
>     at
> com.dexels.kafka.streams.remotejoin.PreJoinProcessor.
> process(PreJoinProcessor.java:1)
>     at
> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(
> ProcessorNode.java:50)
>     at
> org.apache.kafka.streams.processor.internals.ProcessorNode.
> runAndMeasureLatency(ProcessorNode.java:244)
>     at
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> ProcessorNode.java:133)
>     at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(
> ProcessorContextImpl.java:143)
>     at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(
> ProcessorContextImpl.java:126)
>     at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(
> ProcessorContextImpl.java:90)
>     at
> com.dexels.kafka.streams.remotejoin.StoreProcessor.
> process(StoreProcessor.java:48)
>     at
> com.dexels.kafka.streams.remotejoin.StoreProcessor.
> process(StoreProcessor.java:1)
>     at
> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(
> ProcessorNode.java:50)
>     at
> org.apache.kafka.streams.processor.internals.ProcessorNode.
> runAndMeasureLatency(ProcessorNode.java:244)
>     at
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> ProcessorNode.java:133)
>     at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(
> ProcessorContextImpl.java:143)
>     at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(
> ProcessorContextImpl.java:126)
>     at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(
> ProcessorContextImpl.java:90)
>     at
> com.dexels.kafka.streams.remotejoin.XmlTransformerProcessor.process(
> XmlTransformerProcessor.java:52)
>     at
> com.dexels.kafka.streams.remotejoin.XmlTransformerProcessor.process(
> XmlTransformerProcessor.java:1)
>     at
> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(
> ProcessorNode.java:50)
>     at
> org.apache.kafka.streams.processor.internals.ProcessorNode.
> runAndMeasureLatency(ProcessorNode.java:244)
>     at
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(
> ProcessorNode.java:133)
>     at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(
> ProcessorContextImpl.java:143)
>     at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(
> ProcessorContextImpl.java:126)
>     at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(
> ProcessorContextImpl.java:90)
>     at
> org.apache.kafka.streams.processor.internals.
> SourceNode.process(SourceNode.java:87)
>     at
> org.apache.kafka.streams.processor.internals.
> StreamTask.process(StreamTask.java:288)
>     at
> org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(
> AssignedStreamsTasks.java:94)
>     at
> org.apache.kafka.streams.processor.internals.TaskManager.process(
> TaskManager.java:409)
>     at
> org.apache.kafka.streams.processor.internals.StreamThread.
> processAndMaybeCommit(StreamThread.java:952)
>     at
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(
> StreamThread.java:827)
>     at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> StreamThread.java:767)
>     at
> org.apache.kafka.streams.processor.internals.
> StreamThread.run(StreamThread.java:736)
>