You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Kostas Kloudas (JIRA)" <ji...@apache.org> on 2017/05/17 12:59:04 UTC
[jira] [Closed] (FLINK-6578) SharedBuffer creates self-loops when
having elements with same value/timestamp.
[ https://issues.apache.org/jira/browse/FLINK-6578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Kostas Kloudas closed FLINK-6578.
---------------------------------
Resolution: Fixed
Merged at 8e4db423b79580de0cf66e905f8a66c12ea3748a in master.
> SharedBuffer creates self-loops when having elements with same value/timestamp.
> -------------------------------------------------------------------------------
>
> Key: FLINK-6578
> URL: https://issues.apache.org/jira/browse/FLINK-6578
> Project: Flink
> Issue Type: Bug
> Components: CEP
> Affects Versions: 1.3.0
> Reporter: Kostas Kloudas
> Assignee: Kostas Kloudas
> Priority: Blocker
> Fix For: 1.3.0
>
>
> This is a test that fails with the current implementation due to the fact that the looping state accepts the two {{middleEvent1}} elements but the shared buffer cannot distinguish between them and gets trapped in an infinite loop leading to running out of memory.
> {code}
> @Test
> public void testEagerZeroOrMoreSameElement() {
> List<StreamRecord<Event>> inputEvents = new ArrayList<>();
> Event startEvent = new Event(40, "c", 1.0);
> Event middleEvent1 = new Event(41, "a", 2.0);
> Event middleEvent2 = new Event(42, "a", 3.0);
> Event middleEvent3 = new Event(43, "a", 4.0);
> Event end1 = new Event(44, "b", 5.0);
> inputEvents.add(new StreamRecord<>(startEvent, 1));
> inputEvents.add(new StreamRecord<>(middleEvent1, 3));
> inputEvents.add(new StreamRecord<>(middleEvent1, 3));
> inputEvents.add(new StreamRecord<>(middleEvent1, 3));
> inputEvents.add(new StreamRecord<>(middleEvent2, 4));
> inputEvents.add(new StreamRecord<>(new Event(50, "d", 6.0), 5));
> inputEvents.add(new StreamRecord<>(middleEvent3, 6));
> inputEvents.add(new StreamRecord<>(middleEvent3, 6));
> inputEvents.add(new StreamRecord<>(end1, 7));
> Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
> private static final long serialVersionUID = 5726188262756267490L;
> @Override
> public boolean filter(Event value) throws Exception {
> return value.getName().equals("c");
> }
> }).followedBy("middle").where(new SimpleCondition<Event>() {
> private static final long serialVersionUID = 5726188262756267490L;
> @Override
> public boolean filter(Event value) throws Exception {
> return value.getName().equals("a");
> }
> }).oneOrMore().optional().followedBy("end1").where(new SimpleCondition<Event>() {
> private static final long serialVersionUID = 5726188262756267490L;
> @Override
> public boolean filter(Event value) throws Exception {
> return value.getName().equals("b");
> }
> });
> NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
> final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
> compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
> Lists.newArrayList(startEvent, middleEvent1, middleEvent1, middleEvent1, middleEvent2, middleEvent3, middleEvent3, end1),
> Lists.newArrayList(startEvent, middleEvent1, middleEvent1, middleEvent1, middleEvent2, middleEvent3, end1),
> Lists.newArrayList(startEvent, middleEvent1, middleEvent1, middleEvent1, middleEvent2, end1),
> Lists.newArrayList(startEvent, middleEvent1, middleEvent1, middleEvent1, end1),
> Lists.newArrayList(startEvent, middleEvent1, middleEvent1, end1),
> Lists.newArrayList(startEvent, middleEvent1, end1),
> Lists.newArrayList(startEvent, end1)
> ));
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)