You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2018/01/02 08:28:50 UTC
flink git commit: [FLINK-8226] [cep] Dangling reference generated
after NFA clean up timed out SharedBufferEntry
Repository: flink
Updated Branches:
refs/heads/master 3fdee00e4 -> 9e3bac39e
[FLINK-8226] [cep] Dangling reference generated after NFA clean up timed out SharedBufferEntry
This closes #5141
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9e3bac39
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9e3bac39
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9e3bac39
Branch: refs/heads/master
Commit: 9e3bac39e01722d22e3c08bef593987244609a72
Parents: 3fdee00
Author: Dian Fu <fu...@alibaba-inc.com>
Authored: Sat Dec 9 10:55:14 2017 +0800
Committer: Dawid Wysakowicz <dw...@apache.org>
Committed: Tue Jan 2 09:27:17 2018 +0100
----------------------------------------------------------------------
.../org/apache/flink/cep/nfa/SharedBuffer.java | 48 +++++++++++++++-----
.../java/org/apache/flink/cep/nfa/NFATest.java | 34 ++++++++++++++
2 files changed, 71 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/9e3bac39/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
index 29e8dc2..542604a 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
@@ -191,14 +191,12 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable {
*/
public boolean prune(long pruningTimestamp) {
Iterator<Map.Entry<K, SharedBufferPage<K, V>>> iter = pages.entrySet().iterator();
- boolean pruned = false;
+ List<SharedBufferEntry<K, V>> prunedEntries = new ArrayList<>();
while (iter.hasNext()) {
SharedBufferPage<K, V> page = iter.next().getValue();
- if (page.prune(pruningTimestamp)) {
- pruned = true;
- }
+ page.prune(pruningTimestamp, prunedEntries);
if (page.isEmpty()) {
// delete page if it is empty
@@ -206,7 +204,14 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable {
}
}
- return pruned;
+ if (!prunedEntries.isEmpty()) {
+ for (Map.Entry<K, SharedBufferPage<K, V>> entry : pages.entrySet()) {
+ entry.getValue().removeEdges(prunedEntries);
+ }
+ return true;
+ } else {
+ return false;
+ }
}
/**
@@ -451,25 +456,21 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable {
* Removes all entries from the map whose timestamp is smaller than the pruning timestamp.
*
* @param pruningTimestamp Timestamp for the pruning
- * @return {@code true} if pruning happened
*/
- public boolean prune(long pruningTimestamp) {
+ public void prune(long pruningTimestamp, List<SharedBufferEntry<K, V>> prunedEntries) {
Iterator<Map.Entry<ValueTimeWrapper<V>, SharedBufferEntry<K, V>>> iterator = entries.entrySet().iterator();
boolean continuePruning = true;
- boolean pruned = false;
while (iterator.hasNext() && continuePruning) {
SharedBufferEntry<K, V> entry = iterator.next().getValue();
if (entry.getValueTime().getTimestamp() <= pruningTimestamp) {
+ prunedEntries.add(entry);
iterator.remove();
- pruned = true;
} else {
continuePruning = false;
}
}
-
- return pruned;
}
public boolean isEmpty() {
@@ -480,6 +481,15 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable {
return entries.remove(valueTime);
}
+ /**
+ * Remove edges with the specified targets for the entries.
+ */
+ private void removeEdges(final List<SharedBufferEntry<K, V>> prunedEntries) {
+ for (Map.Entry<ValueTimeWrapper<V>, SharedBufferEntry<K, V>> entry : entries.entrySet()) {
+ entry.getValue().removeEdges(prunedEntries);
+ }
+ }
+
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
@@ -569,6 +579,22 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable {
edges.add(edge);
}
+ /**
+ * Remove edges with the specified targets.
+ */
+ private void removeEdges(final List<SharedBufferEntry<K, V>> prunedEntries) {
+ Iterator<SharedBufferEdge<K, V>> itor = edges.iterator();
+ while (itor.hasNext()) {
+ SharedBufferEdge<K, V> edge = itor.next();
+ for (SharedBufferEntry<K, V> prunedEntry : prunedEntries) {
+ if (prunedEntry == edge.getTarget()) {
+ itor.remove();
+ break;
+ }
+ }
+ }
+ }
+
public boolean remove() {
if (page != null) {
page.remove(valueTime);
http://git-wip-us.apache.org/repos/asf/flink/blob/9e3bac39/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java
index 0f4066f..2a12d37 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java
@@ -26,6 +26,7 @@ import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.TestLogger;
@@ -176,6 +177,26 @@ public class NFATest extends TestLogger {
assertEquals(expectedPatterns, actualPatterns);
}
+ @Test
+ public void testTimeoutWindowPruning2() throws IOException {
+ NFA<Event> nfa = createLoopingNFA(2);
+ List<StreamRecord<Event>> streamEvents = new ArrayList<>();
+
+ streamEvents.add(new StreamRecord<>(new Event(1, "loop", 1.0), 101L));
+ streamEvents.add(new StreamRecord<>(new Event(2, "loop", 2.0), 102L));
+ streamEvents.add(new StreamRecord<>(new Event(3, "loop", 3.0), 103L));
+ streamEvents.add(new StreamRecord<>(new Event(4, "loop", 4.0), 104L));
+ streamEvents.add(new StreamRecord<>(new Event(5, "loop", 5.0), 105L));
+ runNFA(nfa, streamEvents);
+
+ NFA.NFASerializer<Event> serializer = new NFA.NFASerializer<>(Event.createTypeSerializer());
+
+ //serialize
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ serializer.serialize(nfa, new DataOutputViewStreamWrapper(baos));
+ baos.close();
+ }
+
public <T> Collection<Map<String, List<T>>> runNFA(NFA<T> nfa, List<StreamRecord<T>> inputs) {
Set<Map<String, List<T>>> actualPatterns = new HashSet<>();
@@ -358,4 +379,17 @@ public class NFATest extends TestLogger {
return nfa;
}
+
+ private NFA<Event> createLoopingNFA(long windowLength) {
+ Pattern<Event, ?> pattern = Pattern.<Event>begin("loop").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("loop");
+ }
+ }).timesOrMore(3).within(Time.milliseconds(windowLength));
+
+ return NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+ }
}