You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Dawid Wysakowicz (JIRA)" <ji...@apache.org> on 2017/04/10 14:22:41 UTC

[jira] [Created] (FLINK-6290) SharedBuffer is improperly released when multiple edges between entries

Dawid Wysakowicz created FLINK-6290:
---------------------------------------

             Summary: SharedBuffer is improperly released when multiple edges between entries
                 Key: FLINK-6290
                 URL: https://issues.apache.org/jira/browse/FLINK-6290
             Project: Flink
          Issue Type: Bug
          Components: CEP
    Affects Versions: 1.3.0
            Reporter: Dawid Wysakowicz
            Priority: Critical
             Fix For: 1.3.0


Below test right now fails:
{code}
	@Test
	public void testClearingSharedBufferWithMultipleEdgesBetweenEntries() {
		SharedBuffer<String, Event> sharedBuffer = new SharedBuffer<>(Event.createTypeSerializer());
		int numberEvents = 8;
		Event[] events = new Event[numberEvents];
		final long timestamp = 1L;

		for (int i = 0; i < numberEvents; i++) {
			events[i] = new Event(i + 1, "e" + (i + 1), i);
		}

		sharedBuffer.put("start", events[1], timestamp, DeweyNumber.fromString("1"));
		sharedBuffer.put("branching", events[2], timestamp, "start", events[1], timestamp, DeweyNumber.fromString("1.0"));
		sharedBuffer.put("branching", events[3], timestamp, "start", events[1], timestamp, DeweyNumber.fromString("1.1"));
		sharedBuffer.put("branching", events[3], timestamp, "branching", events[2], timestamp, DeweyNumber.fromString("1.0.0"));
		sharedBuffer.put("branching", events[4], timestamp, "branching", events[3], timestamp, DeweyNumber.fromString("1.0.0.0"));
		sharedBuffer.put("branching", events[4], timestamp, "branching", events[3], timestamp, DeweyNumber.fromString("1.1.0"));

		//simulate IGNORE (next event can point to events[2])
		sharedBuffer.lock("branching", events[2], timestamp);

		sharedBuffer.release("branching", events[4], timestamp);

		//There should be still events[1] and events[2] in the buffer
		assertFalse(sharedBuffer.isEmpty());
	}
{code}

The problem is with the {{SharedBuffer#internalRemove}} method:

{{
private void internalRemove(final SharedBufferEntry<K, V> entry) {
		Stack<SharedBufferEntry<K, V>> entriesToRemove = new Stack<>();
		entriesToRemove.add(entry);

		while (!entriesToRemove.isEmpty()) {
			SharedBufferEntry<K, V> currentEntry = entriesToRemove.pop();

			if (currentEntry.getReferenceCounter() == 0) {
				currentEntry.remove();

				for (SharedBufferEdge<K, V> edge: currentEntry.getEdges()) {
					if (edge.getTarget() != null) {
						edge.getTarget().decreaseReferenceCounter();
						entriesToRemove.push(edge.getTarget());
					}
				}
			}
		}
	}
}}

When currentEntry has multiple edges to the same entry. The entry will be added twice to the entriesToRemove and it's edges will be removed twice.




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)