You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2018/01/02 08:28:50 UTC

flink git commit: [FLINK-8226] [cep] Dangling reference generated after NFA clean up timed out SharedBufferEntry

Repository: flink
Updated Branches:
  refs/heads/master 3fdee00e4 -> 9e3bac39e


[FLINK-8226] [cep] Dangling reference generated after NFA clean up timed out SharedBufferEntry

This closes #5141


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9e3bac39
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9e3bac39
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9e3bac39

Branch: refs/heads/master
Commit: 9e3bac39e01722d22e3c08bef593987244609a72
Parents: 3fdee00
Author: Dian Fu <fu...@alibaba-inc.com>
Authored: Sat Dec 9 10:55:14 2017 +0800
Committer: Dawid Wysakowicz <dw...@apache.org>
Committed: Tue Jan 2 09:27:17 2018 +0100

----------------------------------------------------------------------
 .../org/apache/flink/cep/nfa/SharedBuffer.java  | 48 +++++++++++++++-----
 .../java/org/apache/flink/cep/nfa/NFATest.java  | 34 ++++++++++++++
 2 files changed, 71 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9e3bac39/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
index 29e8dc2..542604a 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
@@ -191,14 +191,12 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable {
 	 */
 	public boolean prune(long pruningTimestamp) {
 		Iterator<Map.Entry<K, SharedBufferPage<K, V>>> iter = pages.entrySet().iterator();
-		boolean pruned = false;
+		List<SharedBufferEntry<K, V>> prunedEntries = new ArrayList<>();
 
 		while (iter.hasNext()) {
 			SharedBufferPage<K, V> page = iter.next().getValue();
 
-			if (page.prune(pruningTimestamp)) {
-				pruned = true;
-			}
+			page.prune(pruningTimestamp, prunedEntries);
 
 			if (page.isEmpty()) {
 				// delete page if it is empty
@@ -206,7 +204,14 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable {
 			}
 		}
 
-		return pruned;
+		if (!prunedEntries.isEmpty()) {
+			for (Map.Entry<K, SharedBufferPage<K, V>> entry : pages.entrySet()) {
+				entry.getValue().removeEdges(prunedEntries);
+			}
+			return true;
+		} else {
+			return false;
+		}
 	}
 
 	/**
@@ -451,25 +456,21 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable {
 		 * Removes all entries from the map whose timestamp is smaller than the pruning timestamp.
 		 *
 		 * @param pruningTimestamp Timestamp for the pruning
-		 * @return {@code true} if pruning happened
 		 */
-		public boolean prune(long pruningTimestamp) {
+		public void prune(long pruningTimestamp, List<SharedBufferEntry<K, V>> prunedEntries) {
 			Iterator<Map.Entry<ValueTimeWrapper<V>, SharedBufferEntry<K, V>>> iterator = entries.entrySet().iterator();
 			boolean continuePruning = true;
-			boolean pruned = false;
 
 			while (iterator.hasNext() && continuePruning) {
 				SharedBufferEntry<K, V> entry = iterator.next().getValue();
 
 				if (entry.getValueTime().getTimestamp() <= pruningTimestamp) {
+					prunedEntries.add(entry);
 					iterator.remove();
-					pruned = true;
 				} else {
 					continuePruning = false;
 				}
 			}
-
-			return pruned;
 		}
 
 		public boolean isEmpty() {
@@ -480,6 +481,15 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable {
 			return entries.remove(valueTime);
 		}
 
+		/**
+		 * Remove edges with the specified targets for the entries.
+		 */
+		private void removeEdges(final List<SharedBufferEntry<K, V>> prunedEntries) {
+			for (Map.Entry<ValueTimeWrapper<V>, SharedBufferEntry<K, V>> entry : entries.entrySet()) {
+				entry.getValue().removeEdges(prunedEntries);
+			}
+		}
+
 		@Override
 		public String toString() {
 			StringBuilder builder = new StringBuilder();
@@ -569,6 +579,22 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable {
 			edges.add(edge);
 		}
 
+		/**
+		 * Remove edges with the specified targets.
+		 */
+		private void removeEdges(final List<SharedBufferEntry<K, V>> prunedEntries) {
+			Iterator<SharedBufferEdge<K, V>> itor = edges.iterator();
+			while (itor.hasNext()) {
+				SharedBufferEdge<K, V> edge = itor.next();
+				for (SharedBufferEntry<K, V> prunedEntry : prunedEntries) {
+					if (prunedEntry == edge.getTarget()) {
+						itor.remove();
+						break;
+					}
+				}
+			}
+		}
+
 		public boolean remove() {
 			if (page != null) {
 				page.remove(valueTime);

http://git-wip-us.apache.org/repos/asf/flink/blob/9e3bac39/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java
index 0f4066f..2a12d37 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java
@@ -26,6 +26,7 @@ import org.apache.flink.cep.pattern.conditions.IterativeCondition;
 import org.apache.flink.cep.pattern.conditions.SimpleCondition;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.TestLogger;
 
@@ -176,6 +177,26 @@ public class NFATest extends TestLogger {
 		assertEquals(expectedPatterns, actualPatterns);
 	}
 
+	@Test
+	public void testTimeoutWindowPruning2() throws IOException {
+		NFA<Event> nfa = createLoopingNFA(2);
+		List<StreamRecord<Event>> streamEvents = new ArrayList<>();
+
+		streamEvents.add(new StreamRecord<>(new Event(1, "loop", 1.0), 101L));
+		streamEvents.add(new StreamRecord<>(new Event(2, "loop", 2.0), 102L));
+		streamEvents.add(new StreamRecord<>(new Event(3, "loop", 3.0), 103L));
+		streamEvents.add(new StreamRecord<>(new Event(4, "loop", 4.0), 104L));
+		streamEvents.add(new StreamRecord<>(new Event(5, "loop", 5.0), 105L));
+		runNFA(nfa, streamEvents);
+
+		NFA.NFASerializer<Event> serializer = new NFA.NFASerializer<>(Event.createTypeSerializer());
+
+		//serialize
+		ByteArrayOutputStream baos = new ByteArrayOutputStream();
+		serializer.serialize(nfa, new DataOutputViewStreamWrapper(baos));
+		baos.close();
+	}
+
 	public <T> Collection<Map<String, List<T>>> runNFA(NFA<T> nfa, List<StreamRecord<T>> inputs) {
 		Set<Map<String, List<T>>> actualPatterns = new HashSet<>();
 
@@ -358,4 +379,17 @@ public class NFATest extends TestLogger {
 
 		return nfa;
 	}
+
+	private NFA<Event> createLoopingNFA(long windowLength) {
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("loop").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("loop");
+			}
+		}).timesOrMore(3).within(Time.milliseconds(windowLength));
+
+		return NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+	}
 }