You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Dawid Wysakowicz (Jira)" <ji...@apache.org> on 2021/01/20 08:59:00 UTC

[jira] [Closed] (FLINK-20814) The CEP code is not running properly

     [ https://issues.apache.org/jira/browse/FLINK-20814?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Dawid Wysakowicz closed FLINK-20814.
------------------------------------
    Resolution: Not A Problem

> The CEP code is not running properly
> ------------------------------------
>
>                 Key: FLINK-20814
>                 URL: https://issues.apache.org/jira/browse/FLINK-20814
>             Project: Flink
>          Issue Type: Bug
>          Components: Library / CEP
>    Affects Versions: 1.12.0
>         Environment: flink1.12.0
> jdk1.8
>            Reporter: little-tomato
>            Priority: Blocker
>
> The cep code is running properly on flink1.11.2,but it is not working properly on flink1.12.0.
> Can somebody help me?
> StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>         // DataStream : source
> 		DataStream<TemperatureEvent> input = env.fromElements(new TemperatureEvent(1,"Device01", 22.0),
>                 new TemperatureEvent(1,"Device01", 27.1), new TemperatureEvent(2,"Device01", 28.1),
>                 new TemperatureEvent(1,"Device01", 22.2), new TemperatureEvent(3,"Device01", 22.1),
>                 new TemperatureEvent(1,"Device02", 22.3), new TemperatureEvent(4,"Device02", 22.1),
>                 new TemperatureEvent(1,"Device02", 22.4), new TemperatureEvent(5,"Device02", 22.7),
>                 new TemperatureEvent(1,"Device02", 27.0), new TemperatureEvent(6,"Device02", 30.0));
>         
>         Pattern<TemperatureEvent, ?> warningPattern = Pattern.<TemperatureEvent>begin("start")
>                 .subtype(TemperatureEvent.class)
>                 .where(new SimpleCondition<TemperatureEvent>() {
>                 	@Override
>                     public boolean filter(TemperatureEvent subEvent) {
>                         if (subEvent.getTemperature() >= 26.0) {
>                             return true;
>                         }
>                         return false;
>                     }
>                 }).where(new SimpleCondition<TemperatureEvent>() {
>                 	@Override
>                 	public boolean filter(TemperatureEvent subEvent) {
>                         if (subEvent.getMachineName().equals("Device02")) {
>                             return true;
>                         }
>                         return false;
>                     }
>                 }).within(Time.seconds(10));
>         DataStream<Alert> patternStream = CEP.pattern(input, warningPattern)
>                 .select(
>                         new RichPatternSelectFunction<TemperatureEvent, Alert>() {
>                             /**
> 							 * 
> 							 */
> 							private static final long serialVersionUID = 1L;
> 							@Override
> 							public void open(Configuration parameters) throws Exception {
> 								System.out.println(getRuntimeContext().getUserCodeClassLoader());
> 							}
> 							@Override
>                             public Alert select(Map<String, List<TemperatureEvent>> event) throws Exception {
>                             	
>                                 return new Alert("Temperature Rise Detected: " + event.get("start") + " on machine name: " + event.get("start"));
>                             }
>                         });
>         patternStream.print();
>         env.execute("CEP on Temperature Sensor");
> it should be output(on flink1.11.2):
> Alert [message=Temperature Rise Detected: [TemperatureEvent [getTemperature()=27.0, getMachineName=Device02]] on machine name: [TemperatureEvent [getTemperature()=27.0, getMachineName=Device02]]]
> Alert [message=Temperature Rise Detected: [TemperatureEvent [getTemperature()=30.0, getMachineName=Device02]] on machine name: [TemperatureEvent [getTemperature()=30.0, getMachineName=Device02]]]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)