You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Люльченко Юрий Николаевич <un...@mail.ru> on 2021/02/26 12:21:26 UTC
Flink CEP: can't process PatternStream (v 1.12, EventTime mode)
Hello,
I’ve already asked the question today and got the solve: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-CEP-can-t-process-PatternStream-td41722.html , and it’s clean for me how PatternStream works with ProcessTime.
But I need help again, I can’t write proper code to execute PatternStream with EventTime regime.
I think the problem is how I assign the watermark strategy.
My code is below, version of Flink is 1.12:
public class Main {
public static void main(String[] args) throws Exception {
Properties properties = new Properties();
properties.put("group.id", "Flink");
properties.put("bootstrap.servers", "broker:9092");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(
"test",
new SimpleStringSchema(),
properties);
DataStream<String> stream = env
.addSource(consumer)
.map((MapFunction<String, String>) s -> {
// Just getting an object model
return model.toString();
}). assignTimestampsAndWatermarks(WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofSeconds(20))
.withTimestampAssigner((event, timestamp) -> {
Model model = new Gson().fromJson(event, Model.class);
return model.getServerTime();
}));
stream.print("Stream");
Pattern<String, String> firstPattern = Pattern
.<String>begin("first")
.where(new IterativeCondition<String>() {
@Override
public boolean filter(String s, Context<String> context) throws Exception {
return s.contains("Start");
}
});
DataStream<String> result = CEP
.pattern(stream, firstPattern)
.inEventTime() // default TimeCharacteristic for 1.12
.process(new PatternProcessFunction<String, String>() {
@Override
public void processMatch(Map<String, List<String>> map, Context context, Collector<String> collector) throws Exception {
collector.collect(map.get("first").get(0));
}
});
result.print("Result");
env.execute();
}
}
Please, help me to correct the code )
Thanks, Yuri L.
Ответить
Переслать
Предложить звонок
Создать событие
Принято Хорошо Все понятно, спасибо за информацию
Re[2]: Flink CEP: can't process PatternStream (v 1.12, EventTime mode)
Posted by Люльченко Юрий Николаевич <un...@mail.ru>.
David,
Thank you again for a reply. It really looks like this situation is happened because of the parallel instances.
Best,
Yuri L.
>Пятница, 26 февраля 2021, 15:40 +03:00 от Dawid Wysakowicz <dw...@apache.org>:
>
>Hi,
>What is exactly the problem? Is it that no patterns are being generated?
>Usually the problem is in idle parallel instances[1]. You need to have data flowing in each of the parallel instances for a watermark to progress. You can also read about it in the aspect of Kafka's partitions[2].
>Best,
>Dawid
>[1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/event_timestamps_watermarks.html#dealing-with-idle-sources
>[2] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/event_timestamps_watermarks.html#watermark-strategies-and-the-kafka-connector
>On 26/02/2021 13:21, Люльченко Юрий Николаевич wrote:
>>Hello,
>>
>>I’ve already asked the question today and got the solve: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-CEP-can-t-process-PatternStream-td41722.html , and it’s clean for me how PatternStream works with ProcessTime.
>>
>>But I need help again, I can’t write proper code to execute PatternStream with EventTime regime.
>>I think the problem is how I assign the watermark strategy.
>>
>>My code is below, version of Flink is 1.12:
>>
>>public class Main {
>>
>> public static void main(String[] args) throws Exception {
>>
>> Properties properties = new Properties();
>> properties.put("group.id", "Flink");
>> properties.put("bootstrap.servers", "broker:9092");
>>
>>
>> StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>>
>> FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(
>> "test",
>> new SimpleStringSchema(),
>> properties);
>>
>> DataStream<String> stream = env
>> .addSource(consumer)
>> .map((MapFunction<String, String>) s -> {
>> // Just getting an object model
>> return model.toString();
>> }). assignTimestampsAndWatermarks(WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofSeconds(20))
>> .withTimestampAssigner((event, timestamp) -> {
>> Model model = new Gson().fromJson(event, Model.class);
>> return model.getServerTime();
>> }));
>>
>> stream.print("Stream");
>>
>>
>>
>> Pattern<String, String> firstPattern = Pattern
>> .<String>begin("first")
>> .where(new IterativeCondition<String>() {
>> @Override
>> public boolean filter(String s, Context<String> context) throws Exception {
>> return s.contains("Start");
>> }
>> });
>>
>> DataStream<String> result = CEP
>> .pattern(stream, firstPattern)
>> .inEventTime() // default TimeCharacteristic for 1.12
>> .process(new PatternProcessFunction<String, String>() {
>> @Override
>> public void processMatch(Map<String, List<String>> map, Context context, Collector<String> collector) throws Exception {
>> collector.collect(map.get("first").get(0));
>> }
>> });
>>
>> result.print("Result");
>>
>> env.execute();
>> }
>>
>>}
>>
>>Please, help me to correct the code )
>>
>>Thanks, Yuri L.
>> Ответить
>> Переслать
>> Предложить звонок
>> Создать событие
>>Принято Хорошо Все понятно, спасибо за информацию
>>
>>
>>
Люльченко Юрий
Re: Flink CEP: can't process PatternStream (v 1.12, EventTime mode)
Posted by Dawid Wysakowicz <dw...@apache.org>.
Hi,
What is exactly the problem? Is it that no patterns are being generated?
Usually the problem is in idle parallel instances[1]. You need to have
data flowing in each of the parallel instances for a watermark to
progress. You can also read about it in the aspect of Kafka's partitions[2].
Best,
Dawid
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/event_timestamps_watermarks.html#dealing-with-idle-sources
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/event_timestamps_watermarks.html#watermark-strategies-and-the-kafka-connector
On 26/02/2021 13:21, Люльченко Юрий Николаевич wrote:
> Hello,
>
> I’ve already asked the question today and got the
> solve: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-CEP-can-t-process-PatternStream-td41722.html
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-CEP-can-t-process-PatternStream-td41722.html>, and
> it’s clean for me how PatternStream works with ProcessTime.
>
> But I need help again, I can’t write proper code to execute
> PatternStream with EventTime regime.
> I think the problem is how I assign the watermark strategy.
>
> My code is below, version of Flink is 1.12:
>
>
> public class Main {
>
>
>
> public static void main(String[] args) throws Exception {
>
>
>
> Properties properties = new Properties();
>
> properties.put("group.id", "Flink");
>
> properties.put("bootstrap.servers", "broker:9092");
>
>
>
>
>
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>
>
>
> FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(
>
> "test",
>
> new SimpleStringSchema(),
>
> properties);
>
>
>
> DataStream<String> stream = env
>
> .addSource(consumer)
>
> .map((MapFunction<String, String>) s -> {
>
> // /Just getting an object model/
>
> return model.toString();
>
>
> }).*assignTimestampsAndWatermarks(WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofSeconds(20))*
>
> * .withTimestampAssigner((event, timestamp) -> {*
>
> * Model model = new Gson().fromJson(event,
> Model.class);*
>
> * return model.getServerTime();*
>
> * }));*
>
>
>
> stream.print("Stream");
>
>
>
>
>
>
>
> Pattern<String, String> firstPattern = Pattern
>
> .<String>begin("first")
>
> .where(new IterativeCondition<String>() {
>
> @Override
>
> public boolean filter(String s, Context<String>
> context) throws Exception {
>
> return s.contains("Start");
>
> }
>
> });
>
>
>
> DataStream<String> result = CEP
>
> .pattern(stream, firstPattern)
>
> *.inEventTime() // default TimeCharacteristic for 1.12*
>
> .process(new PatternProcessFunction<String, String>() {
>
> @Override
>
> public void processMatch(Map<String, List<String>>
> map, Context context, Collector<String> collector) throws Exception {
>
> collector.collect(map.get("first").get(0));
>
> }
>
> });
>
>
>
> result.print("Result");
>
>
>
> env.execute();
>
> }
>
>
>
> }
>
>
> Please, help me to correct the code )
>
> Thanks,Yuri L.
> Ответить
> Переслать
> Предложить звонок
> Создать событие
> ПринятоХорошоВсе понятно, спасибо за информацию
>
>
>
>