You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2019/07/05 06:44:55 UTC
[flink] branch master updated: [FLINK-12319][Library/CEP]Change the
logic of releasing node from recursive to non-recursive
This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new f130e4b [FLINK-12319][Library/CEP]Change the logic of releasing node from recursive to non-recursive
f130e4b is described below
commit f130e4bc259746b1542a2f4d8907d3b35195feb8
Author: liyafan82 <42...@users.noreply.github.com>
AuthorDate: Fri Jul 5 14:44:45 2019 +0800
[FLINK-12319][Library/CEP]Change the logic of releasing node from recursive to non-recursive
---
.../cep/nfa/sharedbuffer/SharedBufferAccessor.java | 45 +++++++++++-----------
.../cep/nfa/sharedbuffer/SharedBufferTest.java | 38 ++++++++++++++++++
2 files changed, 61 insertions(+), 22 deletions(-)
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferAccessor.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferAccessor.java
index 2613f9d..c92df1b 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferAccessor.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferAccessor.java
@@ -235,31 +235,32 @@ public class SharedBufferAccessor<V> implements AutoCloseable {
* @throws Exception Thrown if the system cannot access the state.
*/
public void releaseNode(final NodeId node) throws Exception {
- Lockable<SharedBufferNode> sharedBufferNode = sharedBuffer.getEntry(node);
- if (sharedBufferNode != null) {
- if (sharedBufferNode.release()) {
- removeNode(node, sharedBufferNode.getElement());
- } else {
- sharedBuffer.upsertEntry(node, sharedBufferNode);
+ // the stack used to detect all nodes that needs to be released.
+ Stack<NodeId> nodesToExamine = new Stack<>();
+ nodesToExamine.push(node);
+
+ while (!nodesToExamine.isEmpty()) {
+ NodeId curNode = nodesToExamine.pop();
+ Lockable<SharedBufferNode> curBufferNode = sharedBuffer.getEntry(curNode);
+
+ if (curBufferNode == null) {
+ break;
}
- }
- }
- /**
- * Removes the {@code SharedBufferNode}, when the ref is decreased to zero, and also
- * decrease the ref of the edge on this node.
- *
- * @param node id of the entry
- * @param sharedBufferNode the node body to be removed
- * @throws Exception Thrown if the system cannot access the state.
- */
- private void removeNode(NodeId node, SharedBufferNode sharedBufferNode) throws Exception {
- sharedBuffer.removeEntry(node);
- EventId eventId = node.getEventId();
- releaseEvent(eventId);
+ if (curBufferNode.release()) {
+ // first release the current node
+ sharedBuffer.removeEntry(curNode);
+ releaseEvent(curNode.getEventId());
- for (SharedBufferEdge sharedBufferEdge : sharedBufferNode.getEdges()) {
- releaseNode(sharedBufferEdge.getTarget());
+ for (SharedBufferEdge sharedBufferEdge : curBufferNode.getElement().getEdges()) {
+ NodeId targetId = sharedBufferEdge.getTarget();
+ if (targetId != null) {
+ nodesToExamine.push(targetId);
+ }
+ }
+ } else {
+ sharedBuffer.upsertEntry(curNode, curBufferNode);
+ }
}
}
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferTest.java
index 0583e8b..abd5b8b 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferTest.java
@@ -280,4 +280,42 @@ public class SharedBufferTest extends TestLogger {
assertEquals(4, sharedBuffer.getSharedBufferNodeSize());
}
+ /**
+ * Test releasing a node which has a long path to the terminal node (the node without an out-going edge).
+ * @throws Exception if creating the shared buffer accessor fails.
+ */
+ @Test
+ public void testReleaseNodesWithLongPath() throws Exception {
+ SharedBuffer<Event> sharedBuffer = TestSharedBuffer.createTestBuffer(Event.createTypeSerializer());
+
+ final int numberEvents = 100000;
+ Event[] events = new Event[numberEvents];
+ EventId[] eventIds = new EventId[numberEvents];
+ NodeId[] nodeIds = new NodeId[numberEvents];
+
+ final long timestamp = 1L;
+
+ for (int i = 0; i < numberEvents; i++) {
+ events[i] = new Event(i + 1, "e" + (i + 1), i);
+ eventIds[i] = sharedBuffer.registerEvent(events[i], timestamp);
+ }
+
+ try (SharedBufferAccessor<Event> sharedBufferAccessor = sharedBuffer.getAccessor()) {
+
+ for (int i = 0; i < numberEvents; i++) {
+ NodeId prevId = i == 0 ? null : nodeIds[i - 1];
+ nodeIds[i] = sharedBufferAccessor.put("n" + i, eventIds[i], prevId, DeweyNumber.fromString("1.0"));
+ }
+
+ NodeId lastNode = nodeIds[numberEvents - 1];
+ sharedBufferAccessor.releaseNode(lastNode);
+
+ for (int i = 0; i < numberEvents; i++) {
+ sharedBufferAccessor.releaseEvent(eventIds[i]);
+ }
+ }
+
+ assertTrue(sharedBuffer.isEmpty());
+ }
+
}