You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Steve Bistline <sr...@gmail.com> on 2018/11/16 11:40:50 UTC

Null Pointer Exception

I have a fairly straightforward project that is generating a null pointer
and  heap space error.

Any thoughts on where to begin debugging this?

I suspect it is in this part of the code somewhere.


Pattern<Event, ?> pattern = Pattern
        .<Event> begin("first event").subtype(IoTEvent.class)
        .where(new IterativeCondition<IoTEvent>()
        {
            private static final long serialVersionUID = -6301755149429716724L;

            @Override
            public boolean filter(IoTEvent value, Context<IoTEvent>
ctx) throws Exception {
                return
PatternConstants.AUDIO_YELL.toLowerCase().contains(value.getAudio_FFT1()
);
            }
        })
        .next("second")
        .subtype(IoTEvent.class)
        .where(new IterativeCondition<IoTEvent>() {
            private static final long serialVersionUID = 2392863109523984059L;

            @Override
            public boolean filter(IoTEvent value, Context<IoTEvent>
ctx) throws Exception {
                return
PatternConstants.AUDIO_YELL.toLowerCase().contains(value.getAudio_FFT1()
);
            }
        })
        .next("third")
        .subtype(IoTEvent.class)
        .where(new IterativeCondition<IoTEvent>() {
            private static final long serialVersionUID = 2392863109523984059L;

            @Override
            public boolean filter(IoTEvent value, Context<IoTEvent>
ctx) throws Exception {
                return
PatternConstants.AUDIO_YELL.toLowerCase().contains(value.getAudio_FFT1()
);
            }
        })
        .next("fourth")
        .subtype(IoTEvent.class)
        .where(new IterativeCondition<IoTEvent>() {
            private static final long serialVersionUID = 2392863109523984059L;

            @Override
            public boolean filter(IoTEvent value, Context<IoTEvent>
ctx) throws Exception {
                return
PatternConstants.AUDIO_YELL.toLowerCase().contains(value.getAudio_FFT1()
);
            }
        })
        .within(Time.seconds(10));

Any help appreciated

Thanks,

SRB

=======================================================

=========================================

java.lang.RuntimeException: Exception occurred while processing valve
output watermark:
	at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265)
	at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
	at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
	at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184)
	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:103)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: Failure happened in filter function.
	at org.apache.flink.cep.nfa.NFA.createDecisionGraph(NFA.java:704)
	at org.apache.flink.cep.nfa.NFA.computeNextStates(NFA.java:500)
	at org.apache.flink.cep.nfa.NFA.process(NFA.java:252)
	at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.processEvent(AbstractKeyedCEPPatternOperator.java:332)
	at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.lambda$onEventTime$0(AbstractKeyedCEPPatternOperator.java:235)
	at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
	at java.util.stream.ReferencePipeline$Head.forEachOrdered(ReferencePipeline.java:590)
	at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.onEventTime(AbstractKeyedCEPPatternOperator.java:234)
	at org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:288)
	at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:108)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:734)
	at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262)
	... 7 more
Caused by: java.lang.NullPointerException

Steve Bistline <sr...@gmail.com>
Thu, Nov 15, 8:28 PM (10 hours ago)
to user
More to the story.... the cluster kicked out this error:

Exception: java.lang.OutOfMemoryError thrown from the
UncaughtExceptionHandler in thread "flink-7"

Uncaught error from thread [Uncaught error from thread [Uncaught error from
thread [flink-scheduler-1]:
flink-akka.remote.default-remote-dispatcher-92]: flink-10]: Java heap
space, shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled
for for ActorSystem[Java heap spaceflink, shutting down JVM since
'akka.jvm-exit-on-fatal-error' is enabled for for ActorSystem[Uncaught
error from thread [flink-akka.remote.default-remote-dispatcher-96]: Java
heap space, shutting down JVM since 'akka.jvm-exit-on-fatal-error' is
enabled for for ActorSystem[flink]

flink]

]

Java heap space, shutting down JVM since 'akka.jvm-exit-on-fatal-error' is
enabled for for ActorSystem[flink]

java.lang.OutOfMemoryError: Java heap space

java.lang.OutOfMemoryError: Java heap space

java.lang.OutOfMemoryError: Java heap space

at java.net.URLClassLoader.findClass(URLClassLoader.java:362)

at java.lang.ClassLoader.loadClass(ClassLoader.java:424)

at java.lang.ClassLoader.loadClass(ClassLoader.java:411)

at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)

at java.lang.ClassLoader.loadClass(ClassLoader.java:357)

at
akka.remote.transport.ProtocolStateActor.akka$remote$transport$ProtocolStateActor$$decodePdu(AkkaProtocolTransport.scala:658)

at
akka.remote.transport.ProtocolStateActor$$anonfun$4.applyOrElse(AkkaProtocolTransport.scala:474)

at
akka.remote.transport.ProtocolStateActor$$anonfun$4.applyOrElse(AkkaProtocolTransport.scala:469)

at
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)

at akka.actor.FSM$class.processEvent(FSM.scala:663)

at
akka.remote.transport.ProtocolStateActor.processEvent(AkkaProtocolTransport.scala:285)

at akka.actor.FSM$class.akka$actor$FSM$$processMsg(FSM.scala:657)

at akka.actor.FSM$$anonfun$receive$1.applyOrElse(FSM.scala:651)

at akka.actor.Actor$class.aroundReceive(Actor.scala:502)

at
akka.remote.transport.ProtocolStateActor.aroundReceive(AkkaProtocolTransport.scala:285)

at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)

at akka.actor.ActorCell.invoke(ActorCell.scala:495)

at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)

at akka.dispatch.Mailbox.run(Mailbox.scala:224)

at akka.dispatch.Mailbox.exec(Mailbox.scala:234)

at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

java.lang.OutOfMemoryError: Java heap space

Re: Null Pointer Exception

Posted by Ken Krugler <kk...@transpac.com>.
Hi Steve,

I don’t have experience with the Flink CEP, but based on the previous stack trace you posted I’m guessing that for one of the records, value.getAudio_FFT1() returns null.

— Ken

> On Nov 16, 2018, at 3:40 AM, Steve Bistline <sr...@gmail.com> wrote:
> 
> I have a fairly straightforward project that is generating a null pointer and  heap space error.
> 
> Any thoughts on where to begin debugging this?
> 
> I suspect it is in this part of the code somewhere.
> 
> Pattern<Event, ?> pattern = Pattern
>         .<Event> begin("first event").subtype(IoTEvent.class)
>         .where(new IterativeCondition<IoTEvent>()
>         {
>             private static final long serialVersionUID = -6301755149429716724L;
> 
>             @Override
>             public boolean filter(IoTEvent value, Context<IoTEvent> ctx) throws Exception {
>                 return PatternConstants.AUDIO_YELL.toLowerCase().contains(value.getAudio_FFT1() );
>             }
>         })
>         .next("second")
>         .subtype(IoTEvent.class)
>         .where(new IterativeCondition<IoTEvent>() {
>             private static final long serialVersionUID = 2392863109523984059L;
> 
>             @Override
>             public boolean filter(IoTEvent value, Context<IoTEvent> ctx) throws Exception {
>                 return PatternConstants.AUDIO_YELL.toLowerCase().contains(value.getAudio_FFT1() );
>             }
>         })
>         .next("third")
>         .subtype(IoTEvent.class)
>         .where(new IterativeCondition<IoTEvent>() {
>             private static final long serialVersionUID = 2392863109523984059L;
> 
>             @Override
>             public boolean filter(IoTEvent value, Context<IoTEvent> ctx) throws Exception {
>                 return PatternConstants.AUDIO_YELL.toLowerCase().contains(value.getAudio_FFT1() );
>             }
>         })
>         .next("fourth")
>         .subtype(IoTEvent.class)
>         .where(new IterativeCondition<IoTEvent>() {
>             private static final long serialVersionUID = 2392863109523984059L;
> 
>             @Override
>             public boolean filter(IoTEvent value, Context<IoTEvent> ctx) throws Exception {
>                 return PatternConstants.AUDIO_YELL.toLowerCase().contains(value.getAudio_FFT1() );
>             }
>         })
>         .within(Time.seconds(10));
> 
> Any help appreciated
> 
> Thanks,
> 
> SRB
> 
> =======================================================
> 
> =========================================
> 
> java.lang.RuntimeException: Exception occurred while processing valve output watermark: 
> 	at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265)
> 	at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
> 	at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
> 	at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184)
> 	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:103)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
> 	at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.RuntimeException: Failure happened in filter function.
> 	at org.apache.flink.cep.nfa.NFA.createDecisionGraph(NFA.java:704)
> 	at org.apache.flink.cep.nfa.NFA.computeNextStates(NFA.java:500)
> 	at org.apache.flink.cep.nfa.NFA.process(NFA.java:252)
> 	at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.processEvent(AbstractKeyedCEPPatternOperator.java:332)
> 	at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.lambda$onEventTime$0(AbstractKeyedCEPPatternOperator.java:235)
> 	at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
> 	at java.util.stream.ReferencePipeline$Head.forEachOrdered(ReferencePipeline.java:590)
> 	at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.onEventTime(AbstractKeyedCEPPatternOperator.java:234)
> 	at org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:288)
> 	at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:108)
> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:734)
> 	at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262)
> 	... 7 more
> Caused by: java.lang.NullPointerException
> 
> Steve Bistline <srbistline.tech@gmail.com <ma...@gmail.com>>
> Thu, Nov 15, 8:28 PM (10 hours ago)
> 
> to user
> More to the story.... the cluster kicked out this error:
> 
> Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "flink-7"
> Uncaught error from thread [Uncaught error from thread [Uncaught error from thread [flink-scheduler-1]: flink-akka.remote.default-remote-dispatcher-92]: flink-10]: Java heap space, shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for for ActorSystem[Java heap spaceflink, shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for for ActorSystem[Uncaught error from thread [flink-akka.remote.default-remote-dispatcher-96]: Java heap space, shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for for ActorSystem[flink]
> flink]
> ]
> Java heap space, shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for for ActorSystem[flink]
> java.lang.OutOfMemoryError: Java heap space
> java.lang.OutOfMemoryError: Java heap space
> java.lang.OutOfMemoryError: Java heap space
> 	at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
> 	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> 	at java.lang.ClassLoader.loadClass(ClassLoader.java:411)
> 	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
> 	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> 	at akka.remote.transport.ProtocolStateActor.akka$remote$transport$ProtocolStateActor$$decodePdu(AkkaProtocolTransport.scala:658)
> 	at akka.remote.transport.ProtocolStateActor$$anonfun$4.applyOrElse(AkkaProtocolTransport.scala:474)
> 	at akka.remote.transport.ProtocolStateActor$$anonfun$4.applyOrElse(AkkaProtocolTransport.scala:469)
> 	at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
> 	at akka.actor.FSM$class.processEvent(FSM.scala:663)
> 	at akka.remote.transport.ProtocolStateActor.processEvent(AkkaProtocolTransport.scala:285)
> 	at akka.actor.FSM$class.akka$actor$FSM$$processMsg(FSM.scala:657)
> 	at akka.actor.FSM$$anonfun$receive$1.applyOrElse(FSM.scala:651)
> 	at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
> 	at akka.remote.transport.ProtocolStateActor.aroundReceive(AkkaProtocolTransport.scala:285)
> 	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
> 	at akka.actor.ActorCell.invoke(ActorCell.scala:495)
> 	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
> 	at akka.dispatch.Mailbox.run(Mailbox.scala:224)
> 	at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
> 	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> 	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> 	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> 	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> java.lang.OutOfMemoryError: Java heap space
> 
> 

--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra