You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Dawid Wysakowicz (JIRA)" <ji...@apache.org> on 2017/07/27 08:44:00 UTC
[jira] [Created] (FLINK-7280) Wrong clearing SharedBuffer of Equal
elements with same Timestamp
Dawid Wysakowicz created FLINK-7280:
---------------------------------------
Summary: Wrong clearing SharedBuffer of Equal elements with same Timestamp
Key: FLINK-7280
URL: https://issues.apache.org/jira/browse/FLINK-7280
Project: Flink
Issue Type: Bug
Components: CEP
Affects Versions: 1.3.1, 1.4.0
Reporter: Dawid Wysakowicz
Assignee: Dawid Wysakowicz
Following tests fails right now:
{code}
@Test
public void testClearingBuffer() throws Exception {
List<StreamRecord<Event>> inputEvents = new ArrayList<>();
Event a1 = new Event(40, "a", 1.0);
Event b1 = new Event(41, "b", 2.0);
Event c1 = new Event(41, "c", 2.0);
Event d = new Event(41, "d", 2.0);
inputEvents.add(new StreamRecord<>(a1, 1));
inputEvents.add(new StreamRecord<>(b1, 2));
inputEvents.add(new StreamRecord<>(c1, 2));
inputEvents.add(new StreamRecord<>(d, 2));
Pattern<Event, ?> pattern = Pattern.<Event>begin("a").where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("a");
}
}).followedBy("b").where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("b");
}
}).followedBy("c").where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("c");
}
}).followedBy("d").where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("d");
}
});
NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
Lists.newArrayList(a1, b1, c1, d)
));
assertTrue(nfa.isEmpty());
}
{code}
{code}
@Test
public void testClearingBufferWithUntilAtTheEnd() throws Exception {
List<StreamRecord<Event>> inputEvents = new ArrayList<>();
Event a1 = new Event(40, "a", 1.0);
Event d1 = new Event(41, "d", 2.0);
Event d2 = new Event(41, "d", 2.0);
Event d3 = new Event(41, "d", 2.0);
Event d4 = new Event(41, "d", 2.0);
inputEvents.add(new StreamRecord<>(a1, 1));
inputEvents.add(new StreamRecord<>(d1, 2));
inputEvents.add(new StreamRecord<>(d2, 2));
inputEvents.add(new StreamRecord<>(d3, 2));
inputEvents.add(new StreamRecord<>(d4, 4));
Pattern<Event, ?> pattern = Pattern.<Event>begin("a").where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("a");
}
}).followedBy("d").where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("d");
}
}).oneOrMore().until(new IterativeCondition<Event>() {
@Override
public boolean filter(Event value, Context<Event> ctx) throws Exception {
return Iterators.size(ctx.getEventsForPattern("d").iterator()) == 3;
}
});
NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
Lists.newArrayList(a1, d1, d2, d3),
Lists.newArrayList(a1, d1, d2),
Lists.newArrayList(a1, d1)
));
assertTrue(nfa.isEmpty());
}
{code}
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)