You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Steve Bistline <sr...@gmail.com> on 2018/11/20 18:09:39 UTC

Exception occurred while processing valve output watermark & NullPointerException

Any guidance would be most appreciated.

Thx

Steve
===========================================

java.lang.RuntimeException: Exception occurred while processing valve
output watermark:
	at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265)
	at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
	at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
	at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184)
	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException:
org.apache.flink.util.FlinkRuntimeException: Failure happened in
filter function.
	at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.lambda$onEventTime$0(AbstractKeyedCEPPatternOperator.java:284)
	at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
	at java.util.stream.ReferencePipeline$Head.forEachOrdered(ReferencePipeline.java:590)
	at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.onEventTime(AbstractKeyedCEPPatternOperator.java:279)
	at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:251)
	at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:128)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:769)
	at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262)
	... 7 more
Caused by: org.apache.flink.util.FlinkRuntimeException: Failure
happened in filter function.
	at org.apache.flink.cep.nfa.NFA.createDecisionGraph(NFA.java:731)
	at org.apache.flink.cep.nfa.NFA.computeNextStates(NFA.java:541)
	at org.apache.flink.cep.nfa.NFA.doProcess(NFA.java:284)
	at org.apache.flink.cep.nfa.NFA.process(NFA.java:220)
	at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.processEvent(AbstractKeyedCEPPatternOperator.java:379)
	at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.lambda$onEventTime$0(AbstractKeyedCEPPatternOperator.java:282)
	... 14 more
Caused by: java.lang.NullPointerException
	at java.lang.String.contains(String.java:2133)
	at com.amazonaws.prediction.FlinkCEPAudio$4.filter(FlinkCEPAudio.java:119)
	at com.amazonaws.prediction.FlinkCEPAudio$4.filter(FlinkCEPAudio.java:114)
	at org.apache.flink.cep.pattern.conditions.AndCondition.filter(AndCondition.java:43)
	at org.apache.flink.cep.nfa.NFA.checkFilterCondition(NFA.java:742)
	at org.apache.flink.cep.nfa.NFA.createDecisionGraph(NFA.java:716)
	... 19 more



==================================================


The code


      // Consume the data streams from AWS Kinesis stream
        DataStream<Event> dataStream = env.addSource(new FlinkKinesisConsumer<>(
                pt.getRequired("stream"),
                new EventSchema(),
                kinesisConsumerConfig))
                .name("Kinesis Stream Consumer");

       //dataStream.print();

        DataStream<Event> kinesisStream = dataStream
                .assignTimestampsAndWatermarks(new TimeLagWatermarkGenerator())
                .map(event -> (IoTEvent) event);

        // Prints the mapped records from the Kinesis stream

        //kinesisStream.print();


        Pattern<Event, ?> pattern = Pattern
                .<Event> begin("first event").subtype(IoTEvent.class)
                .where(new IterativeCondition<IoTEvent>()
                {
                    //private static final long serialVersionUID =
-6301755149429716724L;

                    @Override
                    public boolean filter(IoTEvent value,
Context<IoTEvent> ctx) throws Exception {
                        return
PatternConstants.AUDIO_YELL.toLowerCase().contains(value.getAudio_FFT1()
);
                    }
                })
                .next("second")
                .subtype(IoTEvent.class)
                .where(new IterativeCondition<IoTEvent>() {
                    //private static final long serialVersionUID =
2392863109523984059L;

                    @Override
                    public boolean filter(IoTEvent value,
Context<IoTEvent> ctx) throws Exception {
                        return
PatternConstants.AUDIO_YELL.toLowerCase().contains(value.getAudio_FFT1()
);
                    }
                })
                .next("third")
                .subtype(IoTEvent.class)
                .where(new IterativeCondition<IoTEvent>() {
                    private static final long serialVersionUID =
2392863109523984059L;

                    @Override
                    public boolean filter(IoTEvent value,
Context<IoTEvent> ctx) throws Exception {
                        return
PatternConstants.AUDIO_YELL.toLowerCase().contains(value.getAudio_FFT1()
);
                   }
                })
                .next("fourth")
                .subtype(IoTEvent.class)
                .where(new IterativeCondition<IoTEvent>() {
                    private static final long serialVersionUID =
2392863109523984059L;

                    @Override
                    public boolean filter(IoTEvent value,
Context<IoTEvent> ctx) throws Exception {
                        return
PatternConstants.AUDIO_YELL.toLowerCase().contains(value.getAudio_FFT1()
);
                    }
                })
                .within(Time.seconds(10));


        // Match the pattern in the input data stream
        PatternStream<Event> patternStream =
CEP.pattern(kinesisStream, pattern);

        // Detects MOTION pattern match and alert
        DataStream<Alert> alerts = patternStream.select(
                new PatternSelectFunction<Event, Alert>() {
                    @Override
                    public Alert select(Map<String, List<Event>>
pattern) throws Exception {
                        Alert alert = new Alert(pattern);
                        System.out.printf("AUDIO ALERT\n");


                        return alert;
                    }

        }).name("Audio Alert Sink");

Re: Exception occurred while processing valve output watermark & NullPointerException

Posted by Dawid Wysakowicz <dw...@apache.org>.
Hi,

I think vino is right. It seems that the NullPointerException comes from
your condition. Please add handling of the situation when the string
that you are comparing is null.

Best,

Dawid


On 21/11/2018 04:32, vino yang wrote:
> Hi Steve,
>
> It seems the NPE caused by the property of the IoTEvent's instance.
> Can you make sure the property is not null?
>
> Thanks, vino.
>
> Steve Bistline <srbistline.tech@gmail.com
> <ma...@gmail.com>> 于2018年11月21日周三 上午2:09写道:
>
>     Any guidance would be most appreciated.
>
>     Thx
>
>     Steve
>     ===========================================
>
>     java.lang.RuntimeException: Exception occurred while processing valve output watermark: 
>     	at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265)
>     	at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
>     	at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
>     	at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184)
>     	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>     	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>     	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>     	at java.lang.Thread.run(Thread.java:748)
>     Caused by: java.lang.RuntimeException: org.apache.flink.util.FlinkRuntimeException: Failure happened in filter function.
>     	at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.lambda$onEventTime$0(AbstractKeyedCEPPatternOperator.java:284)
>     	at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
>     	at java.util.stream.ReferencePipeline$Head.forEachOrdered(ReferencePipeline.java:590)
>     	at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.onEventTime(AbstractKeyedCEPPatternOperator.java:279)
>     	at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:251)
>     	at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:128)
>     	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:769)
>     	at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262)
>     	... 7 more
>     Caused by: org.apache.flink.util.FlinkRuntimeException: Failure happened in filter function.
>     	at org.apache.flink.cep.nfa.NFA.createDecisionGraph(NFA.java:731)
>     	at org.apache.flink.cep.nfa.NFA.computeNextStates(NFA.java:541)
>     	at org.apache.flink.cep.nfa.NFA.doProcess(NFA.java:284)
>     	at org.apache.flink.cep.nfa.NFA.process(NFA.java:220)
>     	at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.processEvent(AbstractKeyedCEPPatternOperator.java:379)
>     	at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.lambda$onEventTime$0(AbstractKeyedCEPPatternOperator.java:282)
>     	... 14 more
>     Caused by: java.lang.NullPointerException
>     	at java.lang.String.contains(String.java:2133)
>     	at com.amazonaws.prediction.FlinkCEPAudio$4.filter(FlinkCEPAudio.java:119)
>     	at com.amazonaws.prediction.FlinkCEPAudio$4.filter(FlinkCEPAudio.java:114)
>     	at org.apache.flink.cep.pattern.conditions.AndCondition.filter(AndCondition.java:43)
>     	at org.apache.flink.cep.nfa.NFA.checkFilterCondition(NFA.java:742)
>     	at org.apache.flink.cep.nfa.NFA.createDecisionGraph(NFA.java:716)
>     	... 19 more
>
>     ==================================================
>
>     The code
>
>           // Consume the data streams from AWS Kinesis stream DataStream<Event> dataStream = env.addSource(new FlinkKinesisConsumer<>(
>                     pt.getRequired("stream"), new EventSchema(), kinesisConsumerConfig))
>                     .name("Kinesis Stream Consumer"); //dataStream.print(); DataStream<Event> kinesisStream = dataStream
>                     .assignTimestampsAndWatermarks(new TimeLagWatermarkGenerator())
>                     .map(event -> (IoTEvent) event); // Prints the mapped records from the Kinesis stream
>     //kinesisStream.print();Pattern<Event, ?> pattern = Pattern
>                     .<Event> begin("first event").subtype(IoTEvent.class)
>                     .where(new IterativeCondition<IoTEvent>()
>                     {
>                         //private static final long serialVersionUID = -6301755149429716724L; @Override
>                         public boolean filter(IoTEvent value, Context<IoTEvent> ctx) throws Exception {
>                             return PatternConstants.AUDIO_YELL.toLowerCase().contains(value.getAudio_FFT1() ); }
>                     })
>                     .next("second")
>                     .subtype(IoTEvent.class)
>                     .where(new IterativeCondition<IoTEvent>() {
>                         //private static final long serialVersionUID = 2392863109523984059L; @Override
>                         public boolean filter(IoTEvent value, Context<IoTEvent> ctx) throws Exception {
>                             return PatternConstants.AUDIO_YELL.toLowerCase().contains(value.getAudio_FFT1() ); }
>                     })
>                     .next("third")
>                     .subtype(IoTEvent.class)
>                     .where(new IterativeCondition<IoTEvent>() {
>                         private static final long serialVersionUID = 2392863109523984059L; @Override
>                         public boolean filter(IoTEvent value, Context<IoTEvent> ctx) throws Exception {
>                             return PatternConstants.AUDIO_YELL.toLowerCase().contains(value.getAudio_FFT1() ); }
>                     })
>                     .next("fourth")
>                     .subtype(IoTEvent.class)
>                     .where(new IterativeCondition<IoTEvent>() {
>                         private static final long serialVersionUID = 2392863109523984059L; @Override
>                         public boolean filter(IoTEvent value, Context<IoTEvent> ctx) throws Exception {
>                             return PatternConstants.AUDIO_YELL.toLowerCase().contains(value.getAudio_FFT1() ); }
>                     })
>                     .within(Time.seconds(10)); // Match the pattern in the input data stream PatternStream<Event> patternStream = CEP.pattern(kinesisStream, pattern); // Detects MOTION pattern match and alert DataStream<Alert> alerts = patternStream.select(
>                     new PatternSelectFunction<Event, Alert>() {
>                         @Override
>                         public Alert select(Map<String, List<Event>> pattern) throws Exception {
>                             Alert alert = new Alert(pattern); System.out.printf("AUDIO ALERT\n"); return alert; }
>
>             }).name("Audio Alert Sink");
>

Re: Exception occurred while processing valve output watermark & NullPointerException

Posted by vino yang <ya...@gmail.com>.
Hi Steve,

It seems the NPE caused by the property of the IoTEvent's instance. Can you
make sure the property is not null?

Thanks, vino.

Steve Bistline <sr...@gmail.com> 于2018年11月21日周三 上午2:09写道:

> Any guidance would be most appreciated.
>
> Thx
>
> Steve
> ===========================================
>
> java.lang.RuntimeException: Exception occurred while processing valve output watermark:
> 	at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265)
> 	at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
> 	at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
> 	at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184)
> 	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> 	at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.RuntimeException: org.apache.flink.util.FlinkRuntimeException: Failure happened in filter function.
> 	at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.lambda$onEventTime$0(AbstractKeyedCEPPatternOperator.java:284)
> 	at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
> 	at java.util.stream.ReferencePipeline$Head.forEachOrdered(ReferencePipeline.java:590)
> 	at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.onEventTime(AbstractKeyedCEPPatternOperator.java:279)
> 	at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:251)
> 	at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:128)
> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:769)
> 	at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262)
> 	... 7 more
> Caused by: org.apache.flink.util.FlinkRuntimeException: Failure happened in filter function.
> 	at org.apache.flink.cep.nfa.NFA.createDecisionGraph(NFA.java:731)
> 	at org.apache.flink.cep.nfa.NFA.computeNextStates(NFA.java:541)
> 	at org.apache.flink.cep.nfa.NFA.doProcess(NFA.java:284)
> 	at org.apache.flink.cep.nfa.NFA.process(NFA.java:220)
> 	at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.processEvent(AbstractKeyedCEPPatternOperator.java:379)
> 	at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.lambda$onEventTime$0(AbstractKeyedCEPPatternOperator.java:282)
> 	... 14 more
> Caused by: java.lang.NullPointerException
> 	at java.lang.String.contains(String.java:2133)
> 	at com.amazonaws.prediction.FlinkCEPAudio$4.filter(FlinkCEPAudio.java:119)
> 	at com.amazonaws.prediction.FlinkCEPAudio$4.filter(FlinkCEPAudio.java:114)
> 	at org.apache.flink.cep.pattern.conditions.AndCondition.filter(AndCondition.java:43)
> 	at org.apache.flink.cep.nfa.NFA.checkFilterCondition(NFA.java:742)
> 	at org.apache.flink.cep.nfa.NFA.createDecisionGraph(NFA.java:716)
> 	... 19 more
>
>
>
> ==================================================
>
>
> The code
>
>
>       // Consume the data streams from AWS Kinesis stream
>         DataStream<Event> dataStream = env.addSource(new FlinkKinesisConsumer<>(
>                 pt.getRequired("stream"),
>                 new EventSchema(),
>                 kinesisConsumerConfig))
>                 .name("Kinesis Stream Consumer");
>
>        //dataStream.print();
>
>         DataStream<Event> kinesisStream = dataStream
>                 .assignTimestampsAndWatermarks(new TimeLagWatermarkGenerator())
>                 .map(event -> (IoTEvent) event);
>
>         // Prints the mapped records from the Kinesis stream
>
>         //kinesisStream.print();
>
>
>         Pattern<Event, ?> pattern = Pattern
>                 .<Event> begin("first event").subtype(IoTEvent.class)
>                 .where(new IterativeCondition<IoTEvent>()
>                 {
>                     //private static final long serialVersionUID = -6301755149429716724L;
>
>                     @Override
>                     public boolean filter(IoTEvent value, Context<IoTEvent> ctx) throws Exception {
>                         return PatternConstants.AUDIO_YELL.toLowerCase().contains(value.getAudio_FFT1() );
>                     }
>                 })
>                 .next("second")
>                 .subtype(IoTEvent.class)
>                 .where(new IterativeCondition<IoTEvent>() {
>                     //private static final long serialVersionUID = 2392863109523984059L;
>
>                     @Override
>                     public boolean filter(IoTEvent value, Context<IoTEvent> ctx) throws Exception {
>                         return PatternConstants.AUDIO_YELL.toLowerCase().contains(value.getAudio_FFT1() );
>                     }
>                 })
>                 .next("third")
>                 .subtype(IoTEvent.class)
>                 .where(new IterativeCondition<IoTEvent>() {
>                     private static final long serialVersionUID = 2392863109523984059L;
>
>                     @Override
>                     public boolean filter(IoTEvent value, Context<IoTEvent> ctx) throws Exception {
>                         return PatternConstants.AUDIO_YELL.toLowerCase().contains(value.getAudio_FFT1() );
>                    }
>                 })
>                 .next("fourth")
>                 .subtype(IoTEvent.class)
>                 .where(new IterativeCondition<IoTEvent>() {
>                     private static final long serialVersionUID = 2392863109523984059L;
>
>                     @Override
>                     public boolean filter(IoTEvent value, Context<IoTEvent> ctx) throws Exception {
>                         return PatternConstants.AUDIO_YELL.toLowerCase().contains(value.getAudio_FFT1() );
>                     }
>                 })
>                 .within(Time.seconds(10));
>
>
>         // Match the pattern in the input data stream
>         PatternStream<Event> patternStream = CEP.pattern(kinesisStream, pattern);
>
>         // Detects MOTION pattern match and alert
>         DataStream<Alert> alerts = patternStream.select(
>                 new PatternSelectFunction<Event, Alert>() {
>                     @Override
>                     public Alert select(Map<String, List<Event>> pattern) throws Exception {
>                         Alert alert = new Alert(pattern);
>                         System.out.printf("AUDIO ALERT\n");
>
>
>                         return alert;
>                     }
>
>         }).name("Audio Alert Sink");
>
>