You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Dawid Wysakowicz (JIRA)" <ji...@apache.org> on 2017/07/27 08:44:00 UTC

[jira] [Created] (FLINK-7280) Wrong clearing SharedBuffer of Equal elements with same Timestamp

Dawid Wysakowicz created FLINK-7280:
---------------------------------------

             Summary: Wrong clearing SharedBuffer of Equal elements with same Timestamp
                 Key: FLINK-7280
                 URL: https://issues.apache.org/jira/browse/FLINK-7280
             Project: Flink
          Issue Type: Bug
          Components: CEP
    Affects Versions: 1.3.1, 1.4.0
            Reporter: Dawid Wysakowicz
            Assignee: Dawid Wysakowicz


Following tests fails right now:

{code}
@Test
public void testClearingBuffer() throws Exception {
	List<StreamRecord<Event>> inputEvents = new ArrayList<>();

	Event a1 = new Event(40, "a", 1.0);
	Event b1 = new Event(41, "b", 2.0);
	Event c1 = new Event(41, "c", 2.0);
	Event d = new Event(41, "d", 2.0);

	inputEvents.add(new StreamRecord<>(a1, 1));
	inputEvents.add(new StreamRecord<>(b1, 2));
	inputEvents.add(new StreamRecord<>(c1, 2));
	inputEvents.add(new StreamRecord<>(d, 2));

	Pattern<Event, ?> pattern = Pattern.<Event>begin("a").where(new SimpleCondition<Event>() {
		@Override
		public boolean filter(Event value) throws Exception {
			return value.getName().equals("a");
		}
	}).followedBy("b").where(new SimpleCondition<Event>() {
		@Override
		public boolean filter(Event value) throws Exception {
			return value.getName().equals("b");
		}
	}).followedBy("c").where(new SimpleCondition<Event>() {
		@Override
		public boolean filter(Event value) throws Exception {
			return value.getName().equals("c");
		}
	}).followedBy("d").where(new SimpleCondition<Event>() {
		@Override
		public boolean filter(Event value) throws Exception {
			return value.getName().equals("d");
		}
	});

	NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);

	List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
	compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
		Lists.newArrayList(a1, b1, c1, d)
	));
	assertTrue(nfa.isEmpty());
}
{code}

{code}
@Test
public void testClearingBufferWithUntilAtTheEnd() throws Exception {
	List<StreamRecord<Event>> inputEvents = new ArrayList<>();

	Event a1 = new Event(40, "a", 1.0);
	Event d1 = new Event(41, "d", 2.0);
	Event d2 = new Event(41, "d", 2.0);
	Event d3 = new Event(41, "d", 2.0);
	Event d4 = new Event(41, "d", 2.0);

	inputEvents.add(new StreamRecord<>(a1, 1));
	inputEvents.add(new StreamRecord<>(d1, 2));
	inputEvents.add(new StreamRecord<>(d2, 2));
	inputEvents.add(new StreamRecord<>(d3, 2));
	inputEvents.add(new StreamRecord<>(d4, 4));

	Pattern<Event, ?> pattern = Pattern.<Event>begin("a").where(new SimpleCondition<Event>() {
		@Override
		public boolean filter(Event value) throws Exception {
			return value.getName().equals("a");
		}
	}).followedBy("d").where(new SimpleCondition<Event>() {
		@Override
		public boolean filter(Event value) throws Exception {
			return value.getName().equals("d");
		}
	}).oneOrMore().until(new IterativeCondition<Event>() {
		@Override
		public boolean filter(Event value, Context<Event> ctx) throws Exception {
			return Iterators.size(ctx.getEventsForPattern("d").iterator()) == 3;
		}
	});

	NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);

	List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
	compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
		Lists.newArrayList(a1, d1, d2, d3),
		Lists.newArrayList(a1, d1, d2),
		Lists.newArrayList(a1, d1)
	));
	assertTrue(nfa.isEmpty());
}
{code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)