You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Dawid Wysakowicz (JIRA)" <ji...@apache.org> on 2017/05/30 11:36:04 UTC
[jira] [Comment Edited] (FLINK-6772) Incorrect ordering of matched
state events in Flink CEP
[ https://issues.apache.org/jira/browse/FLINK-6772?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16029288#comment-16029288 ]
Dawid Wysakowicz edited comment on FLINK-6772 at 5/30/17 11:35 AM:
-------------------------------------------------------------------
Hmm, I could not reproduce this issue.
I tried the following test:
{code}
@Test
public void test() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
DataStream<String> input = env.fromElements(
"a-1", "a-2", "a-3", "a-4", "b-1", "b-2", "b-3"
);
Pattern<String, ?> pattern = Pattern
.<String>begin("start")
.where(new SimpleCondition<String>() {
@Override
public boolean filter(String s) throws Exception {
return s.startsWith("a-");
}
}).times(4).allowCombinations()
.followedByAny("end")
.where(new SimpleCondition<String>() {
public boolean filter(String s) throws Exception {
return s.startsWith("b-");
}
}).times(3).consecutive();
CEP.pattern(input, pattern).select(new PatternSelectFunction<String, String>() {
@Override
public String select(Map<String, List<String>> pattern) throws Exception {
return pattern.toString();
}
}).print();
env.execute();
}
{code}
And the results are as follows:
{code}
1> {start=[a-1, a-2, a-3, a-4], end=[b-1, b-2, b-3]}
{code}
Also checked the code with new, clear project and the results are the same.
was (Author: dawidwys):
Hmm, I could not reproduce this issue.
I tried the following test:
{code}
@Test
public void test() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
DataStream<String> input = env.fromElements(
"a-1", "a-2", "a-3", "a-4", "b-1", "b-2", "b-3"
);
Pattern<String, ?> pattern = Pattern
.<String>begin("start")
.where(new SimpleCondition<String>() {
@Override
public boolean filter(String s) throws Exception {
return s.startsWith("a-");
}
}).times(4).allowCombinations()
.followedByAny("end")
.where(new SimpleCondition<String>() {
public boolean filter(String s) throws Exception {
return s.startsWith("b-");
}
}).times(3).consecutive();
CEP.pattern(input, pattern).select(new PatternSelectFunction<String, String>() {
@Override
public String select(Map<String, List<String>> pattern) throws Exception {
return pattern.toString();
}
}).print();
env.execute();
}
{code}
And the results are as follows:
{code}
1> {start=[a-1, a-2, a-3, a-4], end=[b-1, b-2, b-3]}
{code}
> Incorrect ordering of matched state events in Flink CEP
> -------------------------------------------------------
>
> Key: FLINK-6772
> URL: https://issues.apache.org/jira/browse/FLINK-6772
> Project: Flink
> Issue Type: Bug
> Components: CEP
> Reporter: Tzu-Li (Gordon) Tai
>
> I've stumbled across an unexepected ordering of the matched state events.
> Pattern:
> {code}
> Pattern<String, ?> pattern = Pattern
> .<String>begin("start")
> .where(new IterativeCondition<String>() {
> @Override
> public boolean filter(String s, Context<String> context) throws Exception {
> return s.startsWith("a-");
> }
> }).times(4).allowCombinations()
> .followedByAny("end")
> .where(new IterativeCondition<String>() {
> public boolean filter(String s, Context<String> context) throws Exception {
> return s.startsWith("b-");
> }
> }).times(3).consecutive();
> {code}
> Input event sequence:
> a-1, a-2, a-3, a-4, b-1, b-2, b-3
> On b-3 a matched pattern would be triggered.
> Now, in the {{Map<String, List<IN>>}} map passed via {{select}} in {{PatternSelectFunction}}, the list for the "end" state is:
> b-3, b-1, b-2.
> Based on the timestamp of the events (simply using processing time), the correct order should be b-1, b-2, b-3.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)