You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2017/05/15 13:16:04 UTC

[jira] [Commented] (FLINK-6578) SharedBuffer creates self-loops when having elements with same value/timestamp.

    [ https://issues.apache.org/jira/browse/FLINK-6578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16010482#comment-16010482 ] 

ASF GitHub Bot commented on FLINK-6578:
---------------------------------------

GitHub user kl0u opened a pull request:

    https://github.com/apache/flink/pull/3909

    [FLINK-6578] Fix self-loop handling in SharedBuffer.

    This RP addresses: FLINK-6371, FLINK-6536, FLINK-6255, and FLINK-6578.
    
    R @dawidwys 

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/kl0u/flink cep-uber-pr

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/3909.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3909
    
----
commit dd1784e20baeab97c6af36bd110cff95a2aa1731
Author: kl0u <kk...@gmail.com>
Date:   2017-05-05T11:55:07Z

    [FLINK-6371] [cep] NFA return matched patterns as Map<String, List<T>>.

commit bd7b5270a37a3f134e214aea0e17f1efd9395bbf
Author: kl0u <kk...@gmail.com>
Date:   2017-05-11T09:39:00Z

    [FLINK-6536] [cep] Improve error message in SharedBuffer::put().

commit 72a23bd98cee6a94d31c31b2ea8ff0ddb9757186
Author: kl0u <kk...@gmail.com>
Date:   2017-05-12T14:01:38Z

    [FLINK-6255] [cep] Remove PatternStream.getSideOutput().

commit 96299aa50307119f7bc0654c3ec2acb00f06c68f
Author: kkloudas <kk...@gmail.com>
Date:   2017-05-15T12:33:09Z

    [FLINK-6578] [cep] Fix self-loop handling in SharedBuffer.

commit 07f73391e5c80afda9800b4dc2cde41da1a37acf
Author: kkloudas <kk...@gmail.com>
Date:   2017-05-15T12:49:00Z

    [hotfix] [cep] Remove unused keySelector in operator.

----


> SharedBuffer creates self-loops when having elements with same value/timestamp.
> -------------------------------------------------------------------------------
>
>                 Key: FLINK-6578
>                 URL: https://issues.apache.org/jira/browse/FLINK-6578
>             Project: Flink
>          Issue Type: Bug
>          Components: CEP
>    Affects Versions: 1.3.0
>            Reporter: Kostas Kloudas
>            Assignee: Kostas Kloudas
>             Fix For: 1.3.0
>
>
> This is a test that fails with the current implementation due to the fact that the looping state accepts the two {{middleEvent1}} elements but the shared buffer cannot distinguish between them and gets trapped in an infinite loop leading to running out of memory.
> {code}
> @Test
> 	public void testEagerZeroOrMoreSameElement() {
> 		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
> 		Event startEvent = new Event(40, "c", 1.0);
> 		Event middleEvent1 = new Event(41, "a", 2.0);
> 		Event middleEvent2 = new Event(42, "a", 3.0);
> 		Event middleEvent3 = new Event(43, "a", 4.0);
> 		Event end1 = new Event(44, "b", 5.0);
> 		inputEvents.add(new StreamRecord<>(startEvent, 1));
> 		inputEvents.add(new StreamRecord<>(middleEvent1, 3));
> 		inputEvents.add(new StreamRecord<>(middleEvent1, 3));
> 		inputEvents.add(new StreamRecord<>(middleEvent1, 3));
> 		inputEvents.add(new StreamRecord<>(middleEvent2, 4));
> 		inputEvents.add(new StreamRecord<>(new Event(50, "d", 6.0), 5));
> 		inputEvents.add(new StreamRecord<>(middleEvent3, 6));
> 		inputEvents.add(new StreamRecord<>(middleEvent3, 6));
> 		inputEvents.add(new StreamRecord<>(end1, 7));
> 		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
> 			private static final long serialVersionUID = 5726188262756267490L;
> 			@Override
> 			public boolean filter(Event value) throws Exception {
> 				return value.getName().equals("c");
> 			}
> 		}).followedBy("middle").where(new SimpleCondition<Event>() {
> 			private static final long serialVersionUID = 5726188262756267490L;
> 			@Override
> 			public boolean filter(Event value) throws Exception {
> 				return value.getName().equals("a");
> 			}
> 		}).oneOrMore().optional().followedBy("end1").where(new SimpleCondition<Event>() {
> 			private static final long serialVersionUID = 5726188262756267490L;
> 			@Override
> 			public boolean filter(Event value) throws Exception {
> 				return value.getName().equals("b");
> 			}
> 		});
> 		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
> 		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
> 		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
> 				Lists.newArrayList(startEvent, middleEvent1, middleEvent1, middleEvent1, middleEvent2, middleEvent3, middleEvent3, end1),
> 				Lists.newArrayList(startEvent, middleEvent1, middleEvent1, middleEvent1, middleEvent2, middleEvent3, end1),
> 				Lists.newArrayList(startEvent, middleEvent1, middleEvent1, middleEvent1, middleEvent2, end1),
> 				Lists.newArrayList(startEvent, middleEvent1, middleEvent1, middleEvent1, end1),
> 				Lists.newArrayList(startEvent, middleEvent1, middleEvent1, end1),
> 				Lists.newArrayList(startEvent, middleEvent1, end1),
> 				Lists.newArrayList(startEvent, end1)
> 		));
> 	}
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)