You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Люльченко Юрий Николаевич <un...@mail.ru> on 2021/02/26 12:21:26 UTC

Flink CEP: can't process PatternStream (v 1.12, EventTime mode)

Hello,
 
I’ve already asked the question today and got the solve:  http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-CEP-can-t-process-PatternStream-td41722.html , and it’s clean for me how PatternStream works with ProcessTime.
 
But I need help again, I can’t write proper code to execute PatternStream with EventTime regime.
I think the problem is how I assign the watermark strategy.
 
My code is below, version of Flink is 1.12:
 
public class Main {
 
    public static void main(String[] args) throws Exception {
 
        Properties properties = new Properties();
        properties.put("group.id", "Flink");
        properties.put("bootstrap.servers", "broker:9092");
 
 
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
        FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(
                "test",
                new SimpleStringSchema(),
                properties);
 
        DataStream<String> stream = env
                .addSource(consumer)
                .map((MapFunction<String, String>) s -> {
                    //  Just getting an object model
                    return model.toString();
                }). assignTimestampsAndWatermarks(WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofSeconds(20))
                        .withTimestampAssigner((event, timestamp) -> {
                            Model model = new Gson().fromJson(event, Model.class);
                            return model.getServerTime();
                         }));
 
        stream.print("Stream");
 
 
 
        Pattern<String, String> firstPattern = Pattern
                .<String>begin("first")
                .where(new IterativeCondition<String>() {
                    @Override
                    public boolean filter(String s, Context<String> context) throws Exception {
                        return s.contains("Start");
                    }
                });
 
        DataStream<String> result = CEP
                .pattern(stream, firstPattern)
                 .inEventTime() //  default TimeCharacteristic for 1.12
                .process(new PatternProcessFunction<String, String>() {
                    @Override
                    public void processMatch(Map<String, List<String>> map, Context context, Collector<String> collector) throws Exception {
                        collector.collect(map.get("first").get(0));
                    }
                });
 
        result.print("Result");
 
        env.execute();
    }
 
}
 
Please, help me to correct the code )
 
Thanks, Yuri L.
 Ответить
 Переслать
 Предложить звонок
 Создать событие
Принято Хорошо Все понятно, спасибо за информацию
 
 
 

Re[2]: Flink CEP: can't process PatternStream (v 1.12, EventTime mode)

Posted by Люльченко Юрий Николаевич <un...@mail.ru>.
David,
 
Thank you again for a reply. It really looks like this situation is happened because of the parallel instances.
 
Best,
Yuri L.
  
>Пятница, 26 февраля 2021, 15:40 +03:00 от Dawid Wysakowicz <dw...@apache.org>:
> 
>Hi,
>What is exactly the problem? Is it that no patterns are being generated?
>Usually the problem is in idle parallel instances[1]. You need to have data flowing in each of the parallel instances for a watermark to progress. You can also read about it in the aspect of Kafka's partitions[2].
>Best,
>Dawid
>[1]  https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/event_timestamps_watermarks.html#dealing-with-idle-sources
>[2]  https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/event_timestamps_watermarks.html#watermark-strategies-and-the-kafka-connector
>On 26/02/2021 13:21, Люльченко Юрий Николаевич wrote:
>>Hello,
>> 
>>I’ve already asked the question today and got the solve:  http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-CEP-can-t-process-PatternStream-td41722.html , and it’s clean for me how PatternStream works with ProcessTime.
>> 
>>But I need help again, I can’t write proper code to execute PatternStream with EventTime regime.
>>I think the problem is how I assign the watermark strategy.
>> 
>>My code is below, version of Flink is 1.12:
>> 
>>public class Main {
>> 
>>    public static void main(String[] args) throws Exception {
>> 
>>        Properties properties = new Properties();
>>        properties.put("group.id", "Flink");
>>        properties.put("bootstrap.servers", "broker:9092");
>> 
>> 
>>        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>> 
>>        FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(
>>                "test",
>>                new SimpleStringSchema(),
>>                properties);
>> 
>>        DataStream<String> stream = env
>>                .addSource(consumer)
>>                .map((MapFunction<String, String>) s -> {
>>                    //  Just getting an object model
>>                    return model.toString();
>>                }). assignTimestampsAndWatermarks(WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofSeconds(20))
>>                        .withTimestampAssigner((event, timestamp) -> {
>>                            Model model = new Gson().fromJson(event, Model.class);
>>                            return model.getServerTime();
>>                         }));
>> 
>>        stream.print("Stream");
>> 
>> 
>> 
>>        Pattern<String, String> firstPattern = Pattern
>>                .<String>begin("first")
>>                .where(new IterativeCondition<String>() {
>>                    @Override
>>                    public boolean filter(String s, Context<String> context) throws Exception {
>>                        return s.contains("Start");
>>                    }
>>                });
>> 
>>        DataStream<String> result = CEP
>>                .pattern(stream, firstPattern)
>>                 .inEventTime() //  default TimeCharacteristic for 1.12
>>                .process(new PatternProcessFunction<String, String>() {
>>                    @Override
>>                    public void processMatch(Map<String, List<String>> map, Context context, Collector<String> collector) throws Exception {
>>                        collector.collect(map.get("first").get(0));
>>                    }
>>                });
>> 
>>        result.print("Result");
>> 
>>        env.execute();
>>    }
>> 
>>}
>> 
>>Please, help me to correct the code )
>> 
>>Thanks, Yuri L.
>> Ответить
>> Переслать
>> Предложить звонок
>> Создать событие
>>Принято Хорошо Все понятно, спасибо за информацию
>> 
>> 
>> 
 
 
Люльченко Юрий
 

Re: Flink CEP: can't process PatternStream (v 1.12, EventTime mode)

Posted by Dawid Wysakowicz <dw...@apache.org>.
Hi,

What is exactly the problem? Is it that no patterns are being generated?

Usually the problem is in idle parallel instances[1]. You need to have
data flowing in each of the parallel instances for a watermark to
progress. You can also read about it in the aspect of Kafka's partitions[2].

Best,

Dawid

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/event_timestamps_watermarks.html#dealing-with-idle-sources

[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/event_timestamps_watermarks.html#watermark-strategies-and-the-kafka-connector

On 26/02/2021 13:21, Люльченко Юрий Николаевич wrote:
> Hello,
>  
> I’ve already asked the question today and got the
> solve: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-CEP-can-t-process-PatternStream-td41722.html
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-CEP-can-t-process-PatternStream-td41722.html>, and
> it’s clean for me how PatternStream works with ProcessTime.
>  
> But I need help again, I can’t write proper code to execute
> PatternStream with EventTime regime.
> I think the problem is how I assign the watermark strategy.
>  
> My code is below, version of Flink is 1.12:
>  
>
> public class Main {
>
>  
>
>     public static void main(String[] args) throws Exception {
>
>  
>
>         Properties properties = new Properties();
>
>         properties.put("group.id", "Flink");
>
>         properties.put("bootstrap.servers", "broker:9092");
>
>  
>
>  
>
>         StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>
>  
>
>         FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(
>
>                 "test",
>
>                 new SimpleStringSchema(),
>
>                 properties);
>
>  
>
>         DataStream<String> stream = env
>
>                 .addSource(consumer)
>
>                 .map((MapFunction<String, String>) s -> {
>
>                     // /Just getting an object model/
>
>                     return model.toString();
>
>                
> }).*assignTimestampsAndWatermarks(WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofSeconds(20))*
>
> *                        .withTimestampAssigner((event, timestamp) -> {*
>
> *                            Model model = new Gson().fromJson(event,
> Model.class);*
>
> *                            return model.getServerTime();*
>
>                       *  }));*
>
>  
>
>         stream.print("Stream");
>
>  
>
>  
>
>  
>
>         Pattern<String, String> firstPattern = Pattern
>
>                 .<String>begin("first")
>
>                 .where(new IterativeCondition<String>() {
>
>                     @Override
>
>                     public boolean filter(String s, Context<String>
> context) throws Exception {
>
>                         return s.contains("Start");
>
>                     }
>
>                 });
>
>  
>
>         DataStream<String> result = CEP
>
>                 .pattern(stream, firstPattern)
>
>                 *.inEventTime() // default TimeCharacteristic for 1.12*
>
>                 .process(new PatternProcessFunction<String, String>() {
>
>                     @Override
>
>                     public void processMatch(Map<String, List<String>>
> map, Context context, Collector<String> collector) throws Exception {
>
>                         collector.collect(map.get("first").get(0));
>
>                     }
>
>                 });
>
>  
>
>         result.print("Result");
>
>  
>
>         env.execute();
>
>     }
>
>  
>
> }
>
>  
> Please, help me to correct the code )
>  
> Thanks,Yuri L.
>  Ответить
>  Переслать
>  Предложить звонок
>  Создать событие
> ПринятоХорошоВсе понятно, спасибо за информацию
>
>  
>  
>