You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Dawid Wysakowicz (Jira)" <ji...@apache.org> on 2021/01/20 08:59:00 UTC
[jira] [Closed] (FLINK-20814) The CEP code is not running properly
[ https://issues.apache.org/jira/browse/FLINK-20814?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Dawid Wysakowicz closed FLINK-20814.
------------------------------------
Resolution: Not A Problem
> The CEP code is not running properly
> ------------------------------------
>
> Key: FLINK-20814
> URL: https://issues.apache.org/jira/browse/FLINK-20814
> Project: Flink
> Issue Type: Bug
> Components: Library / CEP
> Affects Versions: 1.12.0
> Environment: flink1.12.0
> jdk1.8
> Reporter: little-tomato
> Priority: Blocker
>
> The cep code is running properly on flink1.11.2,but it is not working properly on flink1.12.0.
> Can somebody help me?
> StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
> // DataStream : source
> DataStream<TemperatureEvent> input = env.fromElements(new TemperatureEvent(1,"Device01", 22.0),
> new TemperatureEvent(1,"Device01", 27.1), new TemperatureEvent(2,"Device01", 28.1),
> new TemperatureEvent(1,"Device01", 22.2), new TemperatureEvent(3,"Device01", 22.1),
> new TemperatureEvent(1,"Device02", 22.3), new TemperatureEvent(4,"Device02", 22.1),
> new TemperatureEvent(1,"Device02", 22.4), new TemperatureEvent(5,"Device02", 22.7),
> new TemperatureEvent(1,"Device02", 27.0), new TemperatureEvent(6,"Device02", 30.0));
>
> Pattern<TemperatureEvent, ?> warningPattern = Pattern.<TemperatureEvent>begin("start")
> .subtype(TemperatureEvent.class)
> .where(new SimpleCondition<TemperatureEvent>() {
> @Override
> public boolean filter(TemperatureEvent subEvent) {
> if (subEvent.getTemperature() >= 26.0) {
> return true;
> }
> return false;
> }
> }).where(new SimpleCondition<TemperatureEvent>() {
> @Override
> public boolean filter(TemperatureEvent subEvent) {
> if (subEvent.getMachineName().equals("Device02")) {
> return true;
> }
> return false;
> }
> }).within(Time.seconds(10));
> DataStream<Alert> patternStream = CEP.pattern(input, warningPattern)
> .select(
> new RichPatternSelectFunction<TemperatureEvent, Alert>() {
> /**
> *
> */
> private static final long serialVersionUID = 1L;
> @Override
> public void open(Configuration parameters) throws Exception {
> System.out.println(getRuntimeContext().getUserCodeClassLoader());
> }
> @Override
> public Alert select(Map<String, List<TemperatureEvent>> event) throws Exception {
>
> return new Alert("Temperature Rise Detected: " + event.get("start") + " on machine name: " + event.get("start"));
> }
> });
> patternStream.print();
> env.execute("CEP on Temperature Sensor");
> it should be output(on flink1.11.2):
> Alert [message=Temperature Rise Detected: [TemperatureEvent [getTemperature()=27.0, getMachineName=Device02]] on machine name: [TemperatureEvent [getTemperature()=27.0, getMachineName=Device02]]]
> Alert [message=Temperature Rise Detected: [TemperatureEvent [getTemperature()=30.0, getMachineName=Device02]] on machine name: [TemperatureEvent [getTemperature()=30.0, getMachineName=Device02]]]
--
This message was sent by Atlassian Jira
(v8.3.4#803005)