You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Stephan Ewen (JIRA)" <ji...@apache.org> on 2017/04/21 12:31:04 UTC

[jira] [Resolved] (FLINK-6290) SharedBuffer is improperly released when multiple edges between entries

     [ https://issues.apache.org/jira/browse/FLINK-6290?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Stephan Ewen resolved FLINK-6290.
---------------------------------
    Resolution: Fixed

Fixed via c0ea74a0a4eedf4fe73e8f383e837ea373216476

> SharedBuffer is improperly released when multiple edges between entries
> -----------------------------------------------------------------------
>
>                 Key: FLINK-6290
>                 URL: https://issues.apache.org/jira/browse/FLINK-6290
>             Project: Flink
>          Issue Type: Bug
>          Components: CEP
>    Affects Versions: 1.3.0
>            Reporter: Dawid Wysakowicz
>            Assignee: Dawid Wysakowicz
>            Priority: Critical
>             Fix For: 1.3.0
>
>
> Below test right now fails:
> {code}
> 	@Test
> 	public void testClearingSharedBufferWithMultipleEdgesBetweenEntries() {
> 		SharedBuffer<String, Event> sharedBuffer = new SharedBuffer<>(Event.createTypeSerializer());
> 		int numberEvents = 8;
> 		Event[] events = new Event[numberEvents];
> 		final long timestamp = 1L;
> 		for (int i = 0; i < numberEvents; i++) {
> 			events[i] = new Event(i + 1, "e" + (i + 1), i);
> 		}
> 		sharedBuffer.put("start", events[1], timestamp, DeweyNumber.fromString("1"));
> 		sharedBuffer.put("branching", events[2], timestamp, "start", events[1], timestamp, DeweyNumber.fromString("1.0"));
> 		sharedBuffer.put("branching", events[3], timestamp, "start", events[1], timestamp, DeweyNumber.fromString("1.1"));
> 		sharedBuffer.put("branching", events[3], timestamp, "branching", events[2], timestamp, DeweyNumber.fromString("1.0.0"));
> 		sharedBuffer.put("branching", events[4], timestamp, "branching", events[3], timestamp, DeweyNumber.fromString("1.0.0.0"));
> 		sharedBuffer.put("branching", events[4], timestamp, "branching", events[3], timestamp, DeweyNumber.fromString("1.1.0"));
> 		//simulate IGNORE (next event can point to events[2])
> 		sharedBuffer.lock("branching", events[2], timestamp);
> 		sharedBuffer.release("branching", events[4], timestamp);
> 		//There should be still events[1] and events[2] in the buffer
> 		assertFalse(sharedBuffer.isEmpty());
> 	}
> {code}
> The problem is with the {{SharedBuffer#internalRemove}} method:
> {code}
> private void internalRemove(final SharedBufferEntry<K, V> entry) {
> 		Stack<SharedBufferEntry<K, V>> entriesToRemove = new Stack<>();
> 		entriesToRemove.add(entry);
> 		while (!entriesToRemove.isEmpty()) {
> 			SharedBufferEntry<K, V> currentEntry = entriesToRemove.pop();
> 			if (currentEntry.getReferenceCounter() == 0) {
> 				currentEntry.remove();
> 				for (SharedBufferEdge<K, V> edge: currentEntry.getEdges()) {
> 					if (edge.getTarget() != null) {
> 						edge.getTarget().decreaseReferenceCounter();
> 						entriesToRemove.push(edge.getTarget());
> 					}
> 				}
> 			}
> 		}
> 	}
> {code}
> When currentEntry has multiple edges to the same entry. The entry will be added twice to the entriesToRemove and it's edges will be removed twice and the second edge can potentially change the referenceCounter for both of those entries to 0. Resulting in removing this entry twice.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)