You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2019/07/05 06:44:55 UTC

[flink] branch master updated: [FLINK-12319][Library/CEP]Change the logic of releasing node from recursive to non-recursive

This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new f130e4b  [FLINK-12319][Library/CEP]Change the logic of releasing node from recursive to non-recursive
f130e4b is described below

commit f130e4bc259746b1542a2f4d8907d3b35195feb8
Author: liyafan82 <42...@users.noreply.github.com>
AuthorDate: Fri Jul 5 14:44:45 2019 +0800

    [FLINK-12319][Library/CEP]Change the logic of releasing node from recursive to non-recursive
---
 .../cep/nfa/sharedbuffer/SharedBufferAccessor.java | 45 +++++++++++-----------
 .../cep/nfa/sharedbuffer/SharedBufferTest.java     | 38 ++++++++++++++++++
 2 files changed, 61 insertions(+), 22 deletions(-)

diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferAccessor.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferAccessor.java
index 2613f9d..c92df1b 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferAccessor.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferAccessor.java
@@ -235,31 +235,32 @@ public class SharedBufferAccessor<V> implements AutoCloseable {
 	 * @throws Exception Thrown if the system cannot access the state.
 	 */
 	public void releaseNode(final NodeId node) throws Exception {
-		Lockable<SharedBufferNode> sharedBufferNode = sharedBuffer.getEntry(node);
-		if (sharedBufferNode != null) {
-			if (sharedBufferNode.release()) {
-				removeNode(node, sharedBufferNode.getElement());
-			} else {
-				sharedBuffer.upsertEntry(node, sharedBufferNode);
+		// the stack used to detect all nodes that needs to be released.
+		Stack<NodeId> nodesToExamine = new Stack<>();
+		nodesToExamine.push(node);
+
+		while (!nodesToExamine.isEmpty()) {
+			NodeId curNode = nodesToExamine.pop();
+			Lockable<SharedBufferNode> curBufferNode = sharedBuffer.getEntry(curNode);
+
+			if (curBufferNode == null) {
+				break;
 			}
-		}
-	}
 
-	/**
-	 * Removes the {@code SharedBufferNode}, when the ref is decreased to zero, and also
-	 * decrease the ref of the edge on this node.
-	 *
-	 * @param node id of the entry
-	 * @param sharedBufferNode the node body to be removed
-	 * @throws Exception Thrown if the system cannot access the state.
-	 */
-	private void removeNode(NodeId node, SharedBufferNode sharedBufferNode) throws Exception {
-		sharedBuffer.removeEntry(node);
-		EventId eventId = node.getEventId();
-		releaseEvent(eventId);
+			if (curBufferNode.release()) {
+				// first release the current node
+				sharedBuffer.removeEntry(curNode);
+				releaseEvent(curNode.getEventId());
 
-		for (SharedBufferEdge sharedBufferEdge : sharedBufferNode.getEdges()) {
-			releaseNode(sharedBufferEdge.getTarget());
+				for (SharedBufferEdge sharedBufferEdge : curBufferNode.getElement().getEdges()) {
+					NodeId targetId = sharedBufferEdge.getTarget();
+					if (targetId != null) {
+						nodesToExamine.push(targetId);
+					}
+				}
+			} else {
+				sharedBuffer.upsertEntry(curNode, curBufferNode);
+			}
 		}
 	}
 
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferTest.java
index 0583e8b..abd5b8b 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferTest.java
@@ -280,4 +280,42 @@ public class SharedBufferTest extends TestLogger {
 		assertEquals(4, sharedBuffer.getSharedBufferNodeSize());
 	}
 
+	/**
+	 * Test releasing a node which has a long path to the terminal node (the node without an out-going edge).
+	 * @throws Exception if creating the shared buffer accessor fails.
+	 */
+	@Test
+	public void testReleaseNodesWithLongPath() throws Exception {
+		SharedBuffer<Event> sharedBuffer = TestSharedBuffer.createTestBuffer(Event.createTypeSerializer());
+
+		final int numberEvents = 100000;
+		Event[] events = new Event[numberEvents];
+		EventId[] eventIds = new EventId[numberEvents];
+		NodeId[] nodeIds = new NodeId[numberEvents];
+
+		final long timestamp = 1L;
+
+		for (int i = 0; i < numberEvents; i++) {
+			events[i] = new Event(i + 1, "e" + (i + 1), i);
+			eventIds[i] = sharedBuffer.registerEvent(events[i], timestamp);
+		}
+
+		try (SharedBufferAccessor<Event> sharedBufferAccessor = sharedBuffer.getAccessor()) {
+
+			for (int i = 0; i < numberEvents; i++) {
+				NodeId prevId = i == 0 ? null : nodeIds[i - 1];
+				nodeIds[i] = sharedBufferAccessor.put("n" + i, eventIds[i], prevId, DeweyNumber.fromString("1.0"));
+			}
+
+			NodeId lastNode = nodeIds[numberEvents - 1];
+			sharedBufferAccessor.releaseNode(lastNode);
+
+			for (int i = 0; i < numberEvents; i++) {
+				sharedBufferAccessor.releaseEvent(eventIds[i]);
+			}
+		}
+
+		assertTrue(sharedBuffer.isEmpty());
+	}
+
 }