You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2018/06/13 14:54:26 UTC
[09/10] flink git commit: [FLINK-9418] Separated pruning and element
processing paths
[FLINK-9418] Separated pruning and element processing paths
This closes #6059
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/05ee3ce9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/05ee3ce9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/05ee3ce9
Branch: refs/heads/master
Commit: 05ee3ce96ae51c5df9069564fd5cd2482a62de39
Parents: 9218df8
Author: Dawid Wysakowicz <dw...@apache.org>
Authored: Tue Jun 5 18:57:41 2018 +0200
Committer: Dawid Wysakowicz <dw...@apache.org>
Committed: Wed Jun 13 14:59:59 2018 +0200
----------------------------------------------------------------------
.../main/java/org/apache/flink/cep/nfa/NFA.java | 135 ++++++++++++-------
.../org/apache/flink/cep/nfa/SharedBuffer.java | 4 +-
.../flink/cep/nfa/sharedbuffer/EventId.java | 15 ++-
.../flink/cep/nfa/sharedbuffer/Lockable.java | 6 +-
.../cep/nfa/sharedbuffer/SharedBuffer.java | 87 ++++++------
.../AbstractKeyedCEPPatternOperator.java | 43 +++---
.../org/apache/flink/cep/nfa/NFAITCase.java | 40 ++++--
.../flink/cep/nfa/NFAStatusChangeITCase.java | 7 +-
.../java/org/apache/flink/cep/nfa/NFATest.java | 3 +-
.../apache/flink/cep/nfa/NFATestUtilities.java | 3 +-
.../cep/nfa/sharedbuffer/SharedBufferTest.java | 41 ++++--
.../flink/cep/utils/TestSharedBuffer.java | 6 +
.../flink/runtime/state/KeyedStateFunction.java | 5 +-
.../api/operators/AbstractStreamOperator.java | 2 -
14 files changed, 246 insertions(+), 151 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/05ee3ce9/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
index 227a34d..041a017 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
@@ -105,8 +105,7 @@ public class NFA<T> {
public NFA(
final Collection<State<T>> validStates,
final long windowTime,
- final boolean handleTimeout
- ) {
+ final boolean handleTimeout) {
this.windowTime = windowTime;
this.handleTimeout = handleTimeout;
this.states = loadStates(validStates);
@@ -169,7 +168,6 @@ public class NFA<T> {
return stateObject.isFinal();
}
-
/**
* Processes the next input event. If some of the computations reach a final state then the
* resulting event sequences are returned. If computations time out and timeout handling is
@@ -185,8 +183,9 @@ public class NFA<T> {
* @return Tuple of the collection of matched patterns (e.g. the result of computations which have
* reached a final state) and the collection of timed out patterns (if timeout handling is
* activated)
+ * @throws Exception Thrown if the system cannot access the state.
*/
- public Tuple2<Collection<Map<String, List<T>>>, Collection<Tuple2<Map<String, List<T>>, Long>>> process(
+ public Collection<Map<String, List<T>>> process(
final SharedBuffer<T> sharedBuffer,
final NFAState nfaState,
final T event,
@@ -210,61 +209,91 @@ public class NFA<T> {
* @return Tuple of the collection of matched patterns (e.g. the result of computations which have
* reached a final state) and the collection of timed out patterns (if timeout handling is
* activated)
+ * @throws Exception Thrown if the system cannot access the state.
*/
- public Tuple2<Collection<Map<String, List<T>>>, Collection<Tuple2<Map<String, List<T>>, Long>>> process(
+ public Collection<Map<String, List<T>>> process(
final SharedBuffer<T> sharedBuffer,
final NFAState nfaState,
final T event,
final long timestamp,
final AfterMatchSkipStrategy afterMatchSkipStrategy) throws Exception {
-
try (EventWrapper eventWrapper = new EventWrapper(event, timestamp, sharedBuffer)) {
return doProcess(sharedBuffer, nfaState, eventWrapper, afterMatchSkipStrategy);
}
}
- private Tuple2<Collection<Map<String, List<T>>>, Collection<Tuple2<Map<String, List<T>>, Long>>> doProcess(
+ /**
+ * Prunes states assuming there will be no events with timestamp <b>lower</b> than the given one.
+ * It cleares the sharedBuffer and also emits all timed out partial matches.
+ *
+ * @param sharedBuffer the SharedBuffer object that we need to work upon while processing
+ * @param nfaState The NFAState object that we need to affect while processing
+ * @param timestamp timestamp that indicates that there will be no more events with lower timestamp
+ * @return all timed outed partial matches
+ * @throws Exception Thrown if the system cannot access the state.
+ */
+ public Collection<Tuple2<Map<String, List<T>>, Long>> advanceTime(
final SharedBuffer<T> sharedBuffer,
final NFAState nfaState,
- final EventWrapper event,
- final AfterMatchSkipStrategy afterMatchSkipStrategy) throws Exception {
+ final long timestamp) throws Exception {
Queue<ComputationState> computationStates = nfaState.getComputationStates();
-
- final int numberComputationStates = computationStates.size();
- final Collection<Map<String, List<T>>> result = new ArrayList<>();
final Collection<Tuple2<Map<String, List<T>>, Long>> timeoutResult = new ArrayList<>();
- // iterate over all current computations
+ final int numberComputationStates = computationStates.size();
for (int i = 0; i < numberComputationStates; i++) {
ComputationState computationState = computationStates.poll();
- final Collection<ComputationState> newComputationStates;
-
- if (!isStartState(computationState) &&
- windowTime > 0L &&
- event.getTimestamp() - computationState.getStartTimestamp() >= windowTime) {
+ if (isStateTimedOut(computationState, timestamp)) {
if (handleTimeout) {
// extract the timed out event pattern
Map<String, List<T>> timedOutPattern = extractCurrentMatches(sharedBuffer, computationState);
- timeoutResult.add(Tuple2.of(timedOutPattern, event.getTimestamp()));
+ timeoutResult.add(Tuple2.of(timedOutPattern, timestamp));
}
sharedBuffer.releaseNode(computationState.getPreviousBufferEntry());
- newComputationStates = Collections.emptyList();
nfaState.setStateChanged();
- } else if (event.getEvent() != null) {
- newComputationStates = computeNextStates(sharedBuffer, computationState, event, event.getTimestamp());
-
- if (newComputationStates.size() != 1) {
- nfaState.setStateChanged();
- } else if (!newComputationStates.iterator().next().equals(computationState)) {
- nfaState.setStateChanged();
- }
} else {
- newComputationStates = Collections.singleton(computationState);
+ computationStates.add(computationState);
+ }
+ }
+
+ sharedBuffer.advanceTime(timestamp);
+
+ return timeoutResult;
+ }
+
+ private boolean isStateTimedOut(final ComputationState state, final long timestamp) {
+ return !isStartState(state) && windowTime > 0L && timestamp - state.getStartTimestamp() >= windowTime;
+ }
+
+ private Collection<Map<String, List<T>>> doProcess(
+ final SharedBuffer<T> sharedBuffer,
+ final NFAState nfaState,
+ final EventWrapper event,
+ final AfterMatchSkipStrategy afterMatchSkipStrategy) throws Exception {
+
+ Queue<ComputationState> computationStates = nfaState.getComputationStates();
+
+ final int numberComputationStates = computationStates.size();
+ final Collection<Map<String, List<T>>> result = new ArrayList<>();
+
+ // iterate over all current computations
+ for (int i = 0; i < numberComputationStates; i++) {
+ ComputationState computationState = computationStates.poll();
+
+ final Collection<ComputationState> newComputationStates = computeNextStates(
+ sharedBuffer,
+ computationState,
+ event,
+ event.getTimestamp());
+
+ if (newComputationStates.size() != 1) {
+ nfaState.setStateChanged();
+ } else if (!newComputationStates.iterator().next().equals(computationState)) {
+ nfaState.setStateChanged();
}
//delay adding new computation states in case a stop state is reached and we discard the path.
@@ -299,13 +328,12 @@ public class NFA<T> {
} else {
computationStates.addAll(statesToRetain);
}
-
}
discardComputationStatesAccordingToStrategy(
sharedBuffer, computationStates, result, afterMatchSkipStrategy);
- return Tuple2.of(result, timeoutResult);
+ return result;
}
private void discardComputationStatesAccordingToStrategy(
@@ -500,6 +528,7 @@ public class NFA<T> {
* @param event Current event which is processed
* @param timestamp Timestamp of the current event
* @return Collection of computation states which result from the current one
+ * @throws Exception Thrown if the system cannot access the state.
*/
private Collection<ComputationState> computeNextStates(
final SharedBuffer<T> sharedBuffer,
@@ -558,22 +587,17 @@ public class NFA<T> {
final DeweyNumber nextVersion = new DeweyNumber(currentVersion).addStage();
takeBranchesToVisit--;
- final NodeId newEntry;
+ final NodeId newEntry = sharedBuffer.put(
+ currentState.getName(),
+ event.getEventId(),
+ previousEntry,
+ currentVersion);
+
final long startTimestamp;
if (isStartState(computationState)) {
startTimestamp = timestamp;
- newEntry = sharedBuffer.put(
- currentState.getName(),
- event.getEventId(),
- previousEntry,
- currentVersion);
} else {
startTimestamp = computationState.getStartTimestamp();
- newEntry = sharedBuffer.put(
- currentState.getName(),
- event.getEventId(),
- previousEntry,
- currentVersion);
}
addComputationState(
@@ -631,7 +655,10 @@ public class NFA<T> {
sharedBuffer.lockNode(previousEntry);
}
- private State<T> findFinalStateAfterProceed(SharedBuffer<T> sharedBuffer, State<T> state, T event,
+ private State<T> findFinalStateAfterProceed(
+ SharedBuffer<T> sharedBuffer,
+ State<T> state,
+ T event,
ComputationState computationState) {
final Stack<State<T>> statesToCheck = new Stack<>();
statesToCheck.push(state);
@@ -661,7 +688,9 @@ public class NFA<T> {
return takeBranches == 0 && ignoreBranches == 0 ? 0 : ignoreBranches + Math.max(1, takeBranches);
}
- private OutgoingEdges<T> createDecisionGraph(SharedBuffer<T> sharedBuffer, ComputationState computationState,
+ private OutgoingEdges<T> createDecisionGraph(
+ SharedBuffer<T> sharedBuffer,
+ ComputationState computationState,
T event) {
State<T> state = getState(computationState);
final OutgoingEdges<T> outgoingEdges = new OutgoingEdges<>(state);
@@ -699,8 +728,11 @@ public class NFA<T> {
return outgoingEdges;
}
- private boolean checkFilterCondition(SharedBuffer<T> sharedBuffer, ComputationState computationState,
- IterativeCondition<T> condition, T event) throws Exception {
+ private boolean checkFilterCondition(
+ SharedBuffer<T> sharedBuffer,
+ ComputationState computationState,
+ IterativeCondition<T> condition,
+ T event) throws Exception {
return condition == null || condition.filter(event, new ConditionContext<>(this, sharedBuffer, computationState));
}
@@ -712,8 +744,10 @@ public class NFA<T> {
* @param sharedBuffer The {@link SharedBuffer} from which to extract the matches
* @param computationState The end computation state of the extracted event sequences
* @return Collection of event sequences which end in the given computation state
+ * @throws Exception Thrown if the system cannot access the state.
*/
- private Map<String, List<T>> extractCurrentMatches(final SharedBuffer<T> sharedBuffer,
+ private Map<String, List<T>> extractCurrentMatches(
+ final SharedBuffer<T> sharedBuffer,
final ComputationState computationState) throws Exception {
if (computationState.getPreviousBufferEntry() == null) {
return new HashMap<>();
@@ -819,8 +853,7 @@ public class NFA<T> {
MigratedNFA(
final Queue<ComputationState> computationStates,
- final org.apache.flink.cep.nfa.SharedBuffer<T> sharedBuffer
- ) {
+ final org.apache.flink.cep.nfa.SharedBuffer<T> sharedBuffer) {
this.sharedBuffer = sharedBuffer;
this.computationStates = computationStates;
}
@@ -838,8 +871,8 @@ public class NFA<T> {
public NFASerializerConfigSnapshot() {}
public NFASerializerConfigSnapshot(
- TypeSerializer<T> eventSerializer,
- TypeSerializer<org.apache.flink.cep.nfa.SharedBuffer<T>> sharedBufferSerializer) {
+ TypeSerializer<T> eventSerializer,
+ TypeSerializer<org.apache.flink.cep.nfa.SharedBuffer<T>> sharedBufferSerializer) {
super(eventSerializer, sharedBufferSerializer);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/05ee3ce9/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
index 7a43537..a4dbc00 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
@@ -251,7 +251,7 @@ public class SharedBuffer<V> {
Map<ValueTimeWrapper<V>, EventId> values = new HashMap<>();
Map<EventId, Lockable<V>> valuesWithIds = new HashMap<>();
Map<Tuple2<String, ValueTimeWrapper<V>>, NodeId> mappingContext = new HashMap<>();
- Map<Long, Long> totalEventsPerTimestamp = new HashMap<>();
+ Map<Long, Integer> totalEventsPerTimestamp = new HashMap<>();
int totalPages = source.readInt();
for (int i = 0; i < totalPages; i++) {
@@ -263,7 +263,7 @@ public class SharedBuffer<V> {
ValueTimeWrapper<V> wrapper = ValueTimeWrapper.deserialize(valueSerializer, source);
EventId eventId = values.get(wrapper);
if (eventId == null) {
- long id = totalEventsPerTimestamp.computeIfAbsent(wrapper.timestamp, k -> 0L);
+ int id = totalEventsPerTimestamp.computeIfAbsent(wrapper.timestamp, k -> 0);
eventId = new EventId(id, wrapper.timestamp);
values.put(wrapper, eventId);
valuesWithIds.put(eventId, new Lockable<>(wrapper.value, 1));
http://git-wip-us.apache.org/repos/asf/flink/blob/05ee3ce9/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/EventId.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/EventId.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/EventId.java
index 9b99ea1..57d244a 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/EventId.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/EventId.java
@@ -19,6 +19,7 @@
package org.apache.flink.cep.nfa.sharedbuffer;
import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
import org.apache.flink.core.memory.DataInputView;
@@ -31,15 +32,15 @@ import java.util.Objects;
* Composite key for events in {@link SharedBuffer}.
*/
public class EventId {
- private final long id;
+ private final int id;
private final long timestamp;
- public EventId(long id, long timestamp) {
+ public EventId(int id, long timestamp) {
this.id = id;
this.timestamp = timestamp;
}
- public long getId() {
+ public int getId() {
return id;
}
@@ -110,14 +111,14 @@ public class EventId {
@Override
public void serialize(EventId record, DataOutputView target) throws IOException {
- LongSerializer.INSTANCE.serialize(record.id, target);
+ IntSerializer.INSTANCE.serialize(record.id, target);
LongSerializer.INSTANCE.serialize(record.timestamp, target);
}
@Override
public EventId deserialize(DataInputView source) throws IOException {
- Long id = LongSerializer.INSTANCE.deserialize(source);
- Long timestamp = LongSerializer.INSTANCE.deserialize(source);
+ int id = IntSerializer.INSTANCE.deserialize(source);
+ long timestamp = LongSerializer.INSTANCE.deserialize(source);
return new EventId(id, timestamp);
}
@@ -129,7 +130,7 @@ public class EventId {
@Override
public void copy(DataInputView source, DataOutputView target) throws IOException {
- LongSerializer.INSTANCE.copy(source, target);
+ IntSerializer.INSTANCE.copy(source, target);
LongSerializer.INSTANCE.copy(source, target);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/05ee3ce9/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/Lockable.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/Lockable.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/Lockable.java
index ca1ecae..b782d8a 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/Lockable.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/Lockable.java
@@ -62,10 +62,6 @@ public final class Lockable<T> {
return refCounter == 0;
}
- int getRefCounter() {
- return refCounter;
- }
-
public T getElement() {
return element;
}
@@ -143,7 +139,7 @@ public final class Lockable<T> {
@Override
public Lockable<E> deserialize(DataInputView source) throws IOException {
- Integer refCount = IntSerializer.INSTANCE.deserialize(source);
+ int refCount = IntSerializer.INSTANCE.deserialize(source);
E record = elementSerializer.deserialize(source);
return new Lockable<>(record, refCount);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/05ee3ce9/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java
index 50d997c..32be5da 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java
@@ -18,10 +18,12 @@
package org.apache.flink.cep.nfa.sharedbuffer;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.state.KeyedStateStore;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.cep.nfa.DeweyNumber;
@@ -33,6 +35,7 @@ import org.apache.commons.lang3.StringUtils;
import javax.annotation.Nullable;
import java.util.ArrayList;
+import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -69,8 +72,8 @@ public class SharedBuffer<V> {
private MapState<EventId, Lockable<V>> eventsBuffer;
/** The number of events seen so far in the stream per timestamp. */
- private MapState<Long, Long> eventsCount;
- private MapState<NodeId, Lockable<SharedBufferNode>> pages;
+ private MapState<Long, Integer> eventsCount;
+ private MapState<NodeId, Lockable<SharedBufferNode>> entries;
public SharedBuffer(KeyedStateStore stateStore, TypeSerializer<V> valueSerializer) {
this.eventsBuffer = stateStore.getMapState(
@@ -79,7 +82,7 @@ public class SharedBuffer<V> {
EventId.EventIdSerializer.INSTANCE,
new Lockable.LockableTypeSerializer<>(valueSerializer)));
- this.pages = stateStore.getMapState(
+ this.entries = stateStore.getMapState(
new MapStateDescriptor<>(
entriesStateName,
NodeId.NodeIdSerializer.INSTANCE,
@@ -89,7 +92,24 @@ public class SharedBuffer<V> {
new MapStateDescriptor<>(
eventsCountStateName,
LongSerializer.INSTANCE,
- LongSerializer.INSTANCE));
+ IntSerializer.INSTANCE));
+ }
+
+ /**
+ * Notifies shared buffer that there will be no events with timestamp <&eq; the given value. I allows to clear
+ * internal counters for number of events seen so far per timestamp.
+ *
+ * @param timestamp watermark, no earlier events will arrive
+ * @throws Exception Thrown if the system cannot access the state.
+ */
+ public void advanceTime(long timestamp) throws Exception {
+ Iterator<Long> iterator = eventsCount.keys().iterator();
+ while (iterator.hasNext()) {
+ Long next = iterator.next();
+ if (next < timestamp) {
+ iterator.remove();
+ }
+ }
}
/**
@@ -104,14 +124,14 @@ public class SharedBuffer<V> {
* @throws Exception Thrown if the system cannot access the state.
*/
public EventId registerEvent(V value, long timestamp) throws Exception {
- Long id = eventsCount.get(timestamp);
+ Integer id = eventsCount.get(timestamp);
if (id == null) {
- id = 0L;
+ id = 0;
}
EventId eventId = new EventId(id, timestamp);
eventsBuffer.put(eventId, new Lockable<>(value, 1));
- eventsCount.put(timestamp, id + 1L);
+ eventsCount.put(timestamp, id + 1);
return eventId;
}
@@ -129,9 +149,9 @@ public class SharedBuffer<V> {
Map<EventId, Lockable<V>> events,
Map<NodeId, Lockable<SharedBufferNode>> entries) throws Exception {
eventsBuffer.putAll(events);
- pages.putAll(entries);
+ this.entries.putAll(entries);
- Map<Long, Long> maxIds = events.keySet().stream().collect(Collectors.toMap(
+ Map<Long, Integer> maxIds = events.keySet().stream().collect(Collectors.toMap(
EventId::getTimestamp,
EventId::getId,
Math::max
@@ -140,31 +160,13 @@ public class SharedBuffer<V> {
}
/**
- * Stores given value (value + timestamp) under the given state. It assigns no preceding element
- * relation to the entry.
- *
- * @param stateName name of the state that the event should be assigned to
- * @param eventId unique id of event assigned by this SharedBuffer
- * @param version Version of the previous relation
- * @return assigned id of this entry
- * @throws Exception Thrown if the system cannot access the state.
- */
- public NodeId put(
- final String stateName,
- final EventId eventId,
- final DeweyNumber version) throws Exception {
-
- return put(stateName, eventId, null, version);
- }
-
- /**
* Stores given value (value + timestamp) under the given state. It assigns a preceding element
* relation to the previous entry.
*
- * @param stateName name of the state that the event should be assigned to
- * @param eventId unique id of event assigned by this SharedBuffer
- * @param previousNodeId id of previous entry
- * @param version Version of the previous relation
+ * @param stateName name of the state that the event should be assigned to
+ * @param eventId unique id of event assigned by this SharedBuffer
+ * @param previousNodeId id of previous entry (might be null if start of new run)
+ * @param version Version of the previous relation
* @return assigned id of this element
* @throws Exception Thrown if the system cannot access the state.
*/
@@ -179,7 +181,7 @@ public class SharedBuffer<V> {
}
NodeId currentNodeId = new NodeId(eventId, getOriginalNameFromInternal(stateName));
- Lockable<SharedBufferNode> currentNode = pages.get(currentNodeId);
+ Lockable<SharedBufferNode> currentNode = entries.get(currentNodeId);
if (currentNode == null) {
currentNode = new Lockable<>(new SharedBufferNode(), 0);
lockEvent(eventId);
@@ -188,7 +190,7 @@ public class SharedBuffer<V> {
currentNode.getElement().addEdge(new SharedBufferEdge(
previousNodeId,
version));
- pages.put(currentNodeId, currentNode);
+ entries.put(currentNodeId, currentNode);
return currentNodeId;
}
@@ -221,7 +223,7 @@ public class SharedBuffer<V> {
Stack<ExtractionState> extractionStates = new Stack<>();
// get the starting shared buffer entry for the previous relation
- Lockable<SharedBufferNode> entryLock = pages.get(nodeId);
+ Lockable<SharedBufferNode> entryLock = entries.get(nodeId);
if (entryLock != null) {
SharedBufferNode entry = entryLock.getElement();
@@ -271,7 +273,7 @@ public class SharedBuffer<V> {
}
extractionStates.push(new ExtractionState(
- target != null ? Tuple2.of(target, pages.get(target).getElement()) : null,
+ target != null ? Tuple2.of(target, entries.get(target).getElement()) : null,
edge.getDeweyNumber(),
newPath));
}
@@ -291,10 +293,10 @@ public class SharedBuffer<V> {
* @throws Exception Thrown if the system cannot access the state.
*/
public void lockNode(final NodeId node) throws Exception {
- Lockable<SharedBufferNode> sharedBufferNode = pages.get(node);
+ Lockable<SharedBufferNode> sharedBufferNode = entries.get(node);
if (sharedBufferNode != null) {
sharedBufferNode.lock();
- pages.put(node, sharedBufferNode);
+ entries.put(node, sharedBufferNode);
}
}
@@ -306,18 +308,18 @@ public class SharedBuffer<V> {
* @throws Exception Thrown if the system cannot access the state.
*/
public void releaseNode(final NodeId node) throws Exception {
- Lockable<SharedBufferNode> sharedBufferNode = pages.get(node);
+ Lockable<SharedBufferNode> sharedBufferNode = entries.get(node);
if (sharedBufferNode != null) {
if (sharedBufferNode.release()) {
removeNode(node, sharedBufferNode.getElement());
} else {
- pages.put(node, sharedBufferNode);
+ entries.put(node, sharedBufferNode);
}
}
}
private void removeNode(NodeId node, SharedBufferNode sharedBufferNode) throws Exception {
- pages.remove(node);
+ entries.remove(node);
EventId eventId = node.getEventId();
releaseEvent(eventId);
@@ -392,4 +394,9 @@ public class SharedBuffer<V> {
}
}
+ @VisibleForTesting
+ Iterator<Map.Entry<Long, Integer>> getEventCounters() throws Exception {
+ return eventsCount.iterator();
+ }
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/05ee3ce9/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
index 73a9200..1e482c3 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
@@ -192,7 +192,9 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT, F extends Fu
if (comparator == null) {
// there can be no out of order elements in processing time
NFAState nfaState = getNFAState();
- processEvent(nfaState, element.getValue(), getProcessingTimeService().getCurrentProcessingTime());
+ long timestamp = getProcessingTimeService().getCurrentProcessingTime();
+ advanceTime(nfaState, timestamp);
+ processEvent(nfaState, element.getValue(), timestamp);
updateNFA(nfaState);
} else {
long currentTime = timerService.currentProcessingTime();
@@ -272,15 +274,16 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT, F extends Fu
// STEP 2
while (!sortedTimestamps.isEmpty() && sortedTimestamps.peek() <= timerService.currentWatermark()) {
long timestamp = sortedTimestamps.poll();
+ advanceTime(nfaState, timestamp);
try (Stream<IN> elements = sort(elementQueueState.get(timestamp))) {
elements.forEachOrdered(
- event -> {
- try {
- processEvent(nfaState, event, timestamp);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
+ event -> {
+ try {
+ processEvent(nfaState, event, timestamp);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
}
+ }
);
}
elementQueueState.remove(timestamp);
@@ -318,15 +321,16 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT, F extends Fu
// STEP 2
while (!sortedTimestamps.isEmpty()) {
long timestamp = sortedTimestamps.poll();
+ advanceTime(nfa, timestamp);
try (Stream<IN> elements = sort(elementQueueState.get(timestamp))) {
elements.forEachOrdered(
- event -> {
- try {
- processEvent(nfa, event, timestamp);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
+ event -> {
+ try {
+ processEvent(nfa, event, timestamp);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
}
+ }
);
}
elementQueueState.remove(timestamp);
@@ -377,18 +381,19 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT, F extends Fu
* @param timestamp The timestamp of the event
*/
private void processEvent(NFAState nfaState, IN event, long timestamp) throws Exception {
- Tuple2<Collection<Map<String, List<IN>>>, Collection<Tuple2<Map<String, List<IN>>, Long>>> patterns =
+ Collection<Map<String, List<IN>>> patterns =
nfa.process(partialMatches, nfaState, event, timestamp, afterMatchSkipStrategy);
- processMatchedSequences(patterns.f0, timestamp);
- processTimedOutSequences(patterns.f1, timestamp);
+ processMatchedSequences(patterns, timestamp);
}
/**
- * Advances the time for the given NFA to the given timestamp. This can lead to pruning and
- * timeouts.
+ * Advances the time for the given NFA to the given timestamp. This means that no more events with timestamp
+ * <b>lower</b> than the given timestamp should be passed to the nfa, This can lead to pruning and timeouts.
*/
private void advanceTime(NFAState nfaState, long timestamp) throws Exception {
- processEvent(nfaState, null, timestamp);
+ Collection<Tuple2<Map<String, List<IN>>, Long>> timedOut =
+ nfa.advanceTime(partialMatches, nfaState, timestamp);
+ processTimedOutSequences(timedOut, timestamp);
}
protected abstract void processMatchedSequences(Iterable<Map<String, List<IN>>> matchingSequences, long timestamp) throws Exception;
http://git-wip-us.apache.org/repos/asf/flink/blob/05ee3ce9/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
index 7e904c8..ae68d02 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
@@ -35,6 +35,7 @@ import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.Mockito;
import java.util.ArrayList;
import java.util.Collection;
@@ -49,6 +50,7 @@ import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
import static org.apache.flink.cep.utils.NFAUtils.compile;
import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.anyLong;
/**
* General tests for {@link NFA} features. See also {@link IterativeConditionsITCase}, {@link NotPatternITCase},
@@ -399,11 +401,11 @@ public class NFAITCase extends TestLogger {
NFAState nfaState = nfa.createInitialNFAState();
for (StreamRecord<Event> event: events) {
- Tuple2<Collection<Map<String, List<Event>>>, Collection<Tuple2<Map<String, List<Event>>, Long>>> patterns =
- nfa.process(sharedBuffer, nfaState, event.getValue(), event.getTimestamp());
- Collection<Map<String, List<Event>>> matchedPatterns = patterns.f0;
- Collection<Tuple2<Map<String, List<Event>>, Long>> timeoutPatterns = patterns.f1;
+ Collection<Tuple2<Map<String, List<Event>>, Long>> timeoutPatterns =
+ nfa.advanceTime(sharedBuffer, nfaState, event.getTimestamp());
+ Collection<Map<String, List<Event>>> matchedPatterns =
+ nfa.process(sharedBuffer, nfaState, event.getValue(), event.getTimestamp());
resultingPatterns.addAll(matchedPatterns);
resultingTimeoutPatterns.addAll(timeoutPatterns);
@@ -2338,7 +2340,7 @@ public class NFAITCase extends TestLogger {
nfa.process(sharedBuffer, nfaState, end1, 6);
//pruning element
- nfa.process(sharedBuffer, nfaState, null, 10);
+ nfa.advanceTime(sharedBuffer, nfaState, 10);
assertEquals(1, nfaState.getComputationStates().size());
assertEquals("start", nfaState.getComputationStates().peek().getCurrentStateName());
@@ -2382,7 +2384,7 @@ public class NFAITCase extends TestLogger {
nfa.process(sharedBuffer, nfaState, end1, 6);
//pruning element
- nfa.process(sharedBuffer, nfaState, null, 10);
+ nfa.advanceTime(sharedBuffer, nfaState, 10);
assertEquals(1, nfaState.getComputationStates().size());
assertEquals("start", nfaState.getComputationStates().peek().getCurrentStateName());
@@ -2428,7 +2430,7 @@ public class NFAITCase extends TestLogger {
nfa.process(sharedBuffer, nfaState, end1, 6);
//pruning element
- nfa.process(sharedBuffer, nfaState, null, 10);
+ nfa.advanceTime(sharedBuffer, nfaState, 10);
assertEquals(1, nfaState.getComputationStates().size());
assertEquals("start", nfaState.getComputationStates().peek().getCurrentStateName());
@@ -2474,7 +2476,7 @@ public class NFAITCase extends TestLogger {
nfa.process(sharedBuffer, nfaState, end1, 6);
//pruning element
- nfa.process(sharedBuffer, nfaState, null, 10);
+ nfa.advanceTime(sharedBuffer, nfaState, 10);
assertEquals(1, nfaState.getComputationStates().size());
assertEquals("start", nfaState.getComputationStates().peek().getCurrentStateName());
@@ -2734,7 +2736,7 @@ public class NFAITCase extends TestLogger {
sharedBuffer,
nfaState,
inputEvent.getValue(),
- inputEvent.getTimestamp()).f0;
+ inputEvent.getTimestamp());
resultingPatterns.addAll(patterns);
}
@@ -2809,7 +2811,7 @@ public class NFAITCase extends TestLogger {
sharedBuffer,
nfaState,
inputEvent.getValue(),
- inputEvent.getTimestamp()).f0;
+ inputEvent.getTimestamp());
resultingPatterns.addAll(patterns);
}
@@ -2826,4 +2828,22 @@ public class NFAITCase extends TestLogger {
Assert.assertEquals(expectedOrder, resultOrder);
}
+
+ @Test
+ public void testSharedBufferClearing() throws Exception {
+ Pattern<Event, ?> pattern = Pattern.<Event>begin("start").followedBy("end");
+
+ Event a = new Event(40, "a", 1.0);
+ Event b = new Event(41, "b", 2.0);
+
+ SharedBuffer<Event> spiedBuffer = Mockito.spy(sharedBuffer);
+ NFA<Event> nfa = compile(pattern, false);
+
+ nfa.process(spiedBuffer, nfa.createInitialNFAState(), a, 1);
+ nfa.process(spiedBuffer, nfa.createInitialNFAState(), b, 2);
+ Mockito.verify(spiedBuffer, Mockito.never()).advanceTime(anyLong());
+ nfa.advanceTime(spiedBuffer, nfa.createInitialNFAState(), 2);
+ Mockito.verify(spiedBuffer, Mockito.times(1)).advanceTime(2);
+
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/05ee3ce9/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAStatusChangeITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAStatusChangeITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAStatusChangeITCase.java
index ab9c9b1..ea20bee 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAStatusChangeITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAStatusChangeITCase.java
@@ -117,13 +117,13 @@ public class NFAStatusChangeITCase {
// both the queue of ComputationStatus and eventSharedBuffer have not changed
// as the timestamp is within the window
nfaState.resetStateChanged();
- nfa.process(sharedBuffer, nfaState, null, 8L);
+ nfa.advanceTime(sharedBuffer, nfaState, 8L);
assertFalse("NFA status should not change as the timestamp is within the window", nfaState.isStateChanged());
// timeout ComputationStatus will be removed from the queue of ComputationStatus and timeout event will
// be removed from eventSharedBuffer as the timeout happens
nfaState.resetStateChanged();
- Collection<Tuple2<Map<String, List<Event>>, Long>> timeoutResults = nfa.process(sharedBuffer, nfaState, null, 12L).f1;
+ Collection<Tuple2<Map<String, List<Event>>, Long>> timeoutResults = nfa.advanceTime(sharedBuffer, nfaState, 12L);
assertTrue("NFA status should change as timeout happens", nfaState.isStateChanged() && !timeoutResults.isEmpty());
}
@@ -183,10 +183,11 @@ public class NFAStatusChangeITCase {
NFAState nfaState = nfa.createInitialNFAState();
nfaState.resetStateChanged();
+ nfa.advanceTime(sharedBuffer, nfaState, 6L);
nfa.process(sharedBuffer, nfaState, new Event(6, "start", 1.0), 6L);
nfaState.resetStateChanged();
- nfa.process(sharedBuffer, nfaState, new Event(6, "end", 1.0), 17L);
+ nfa.advanceTime(sharedBuffer, nfaState, 17L);
assertTrue(nfaState.isStateChanged());
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/05ee3ce9/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java
index 646455f..e626b50 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java
@@ -185,11 +185,12 @@ public class NFATest extends TestLogger {
SharedBuffer<Event> sharedBuffer = TestSharedBuffer.createTestBuffer(Event.createTypeSerializer());
for (StreamRecord<Event> streamEvent : inputs) {
+ nfa.advanceTime(sharedBuffer, nfaState, streamEvent.getTimestamp());
Collection<Map<String, List<Event>>> matchedPatterns = nfa.process(
sharedBuffer,
nfaState,
streamEvent.getValue(),
- streamEvent.getTimestamp()).f0;
+ streamEvent.getTimestamp());
actualPatterns.addAll(matchedPatterns);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/05ee3ce9/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATestUtilities.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATestUtilities.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATestUtilities.java
index 00a3bfd..58ba224 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATestUtilities.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATestUtilities.java
@@ -66,12 +66,13 @@ public class NFATestUtilities {
SharedBuffer<Event> sharedBuffer = TestSharedBuffer.createTestBuffer(Event.createTypeSerializer());
for (StreamRecord<Event> inputEvent : inputEvents) {
+ nfa.advanceTime(sharedBuffer, nfaState, inputEvent.getTimestamp());
Collection<Map<String, List<Event>>> patterns = nfa.process(
sharedBuffer,
nfaState,
inputEvent.getValue(),
inputEvent.getTimestamp(),
- afterMatchSkipStrategy).f0;
+ afterMatchSkipStrategy);
for (Map<String, List<Event>> p: patterns) {
List<Event> res = new ArrayList<>();
http://git-wip-us.apache.org/repos/asf/flink/blob/05ee3ce9/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferTest.java
index 9be7cc1..342c9ef 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferTest.java
@@ -29,6 +29,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -93,9 +94,9 @@ public class SharedBufferTest extends TestLogger {
expectedPattern3.put("b", new ArrayList<>());
expectedPattern3.get("b").add(events[7]);
- NodeId a10 = sharedBuffer.put("a1", eventIds[0], DeweyNumber.fromString("1"));
+ NodeId a10 = sharedBuffer.put("a1", eventIds[0], null, DeweyNumber.fromString("1"));
NodeId aLoop0 = sharedBuffer.put("a[]", eventIds[1], a10, DeweyNumber.fromString("1.0"));
- NodeId a11 = sharedBuffer.put("a1", eventIds[2], DeweyNumber.fromString("2"));
+ NodeId a11 = sharedBuffer.put("a1", eventIds[2], null, DeweyNumber.fromString("2"));
NodeId aLoop1 = sharedBuffer.put("a[]", eventIds[2], aLoop0, DeweyNumber.fromString("1.0"));
NodeId aLoop2 = sharedBuffer.put("a[]", eventIds[3], aLoop1, DeweyNumber.fromString("1.0"));
NodeId aSecondLoop0 = sharedBuffer.put("a[]", eventIds[3], a11, DeweyNumber.fromString("2.0"));
@@ -106,12 +107,16 @@ public class SharedBufferTest extends TestLogger {
NodeId aLoop5 = sharedBuffer.put("a[]", eventIds[6], aLoop4, DeweyNumber.fromString("1.1"));
NodeId b3 = sharedBuffer.put("b", eventIds[7], aLoop5, DeweyNumber.fromString("1.1.0"));
- Collection<Map<String, List<Event>>> patterns3 = sharedBuffer.extractPatterns(b3, DeweyNumber.fromString("1.1.0"));
+ Collection<Map<String, List<Event>>> patterns3 = sharedBuffer.extractPatterns(b3,
+ DeweyNumber.fromString("1.1.0"));
sharedBuffer.releaseNode(b3);
- Collection<Map<String, List<Event>>> patterns4 = sharedBuffer.extractPatterns(b3, DeweyNumber.fromString("1.1.0"));
+ Collection<Map<String, List<Event>>> patterns4 = sharedBuffer.extractPatterns(b3,
+ DeweyNumber.fromString("1.1.0"));
- Collection<Map<String, List<Event>>> patterns1 = sharedBuffer.extractPatterns(b1, DeweyNumber.fromString("2.0.0"));
- Collection<Map<String, List<Event>>> patterns2 = sharedBuffer.extractPatterns(b0, DeweyNumber.fromString("1.0.0"));
+ Collection<Map<String, List<Event>>> patterns1 = sharedBuffer.extractPatterns(b1,
+ DeweyNumber.fromString("2.0.0"));
+ Collection<Map<String, List<Event>>> patterns2 = sharedBuffer.extractPatterns(b0,
+ DeweyNumber.fromString("1.0.0"));
sharedBuffer.releaseNode(b0);
sharedBuffer.releaseNode(b1);
@@ -144,7 +149,7 @@ public class SharedBufferTest extends TestLogger {
eventIds[i] = sharedBuffer.registerEvent(events[i], timestamp);
}
- NodeId start = sharedBuffer.put("start", eventIds[1], DeweyNumber.fromString("1"));
+ NodeId start = sharedBuffer.put("start", eventIds[1], null, DeweyNumber.fromString("1"));
NodeId b0 = sharedBuffer.put("branching", eventIds[2], start, DeweyNumber.fromString("1.0"));
NodeId b1 = sharedBuffer.put("branching", eventIds[3], start, DeweyNumber.fromString("1.1"));
NodeId b00 = sharedBuffer.put("branching", eventIds[3], b0, DeweyNumber.fromString("1.0.0"));
@@ -189,7 +194,7 @@ public class SharedBufferTest extends TestLogger {
expectedResult.put("c", new ArrayList<>());
expectedResult.get("c").add(events[4]);
- NodeId a = sharedBuffer.put("a", eventIds[0], DeweyNumber.fromString("1"));
+ NodeId a = sharedBuffer.put("a", eventIds[0], null, DeweyNumber.fromString("1"));
NodeId b = sharedBuffer.put("b", eventIds[1], a, DeweyNumber.fromString("1.0"));
NodeId aa = sharedBuffer.put("aa", eventIds[2], b, DeweyNumber.fromString("1.0.0"));
NodeId bb = sharedBuffer.put("bb", eventIds[3], aa, DeweyNumber.fromString("1.0.0.0"));
@@ -213,4 +218,24 @@ public class SharedBufferTest extends TestLogger {
assertEquals(expectedOrder, resultOrder);
}
+ @Test
+ public void testSharedBufferCountersClearing() throws Exception {
+ SharedBuffer<Event> sharedBuffer = TestSharedBuffer.createTestBuffer(Event.createTypeSerializer());
+ int numberEvents = 4;
+ Event[] events = new Event[numberEvents];
+
+ for (int i = 0; i < numberEvents; i++) {
+ events[i] = new Event(i + 1, "e" + (i + 1), i);
+ sharedBuffer.registerEvent(events[i], i);
+ }
+
+ sharedBuffer.advanceTime(3);
+
+ Iterator<Map.Entry<Long, Integer>> counters = sharedBuffer.getEventCounters();
+ Map.Entry<Long, Integer> entry = counters.next();
+ assertEquals(3, entry.getKey().longValue());
+ assertEquals(1, entry.getValue().intValue());
+ assertFalse(counters.hasNext());
+ }
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/05ee3ce9/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/TestSharedBuffer.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/TestSharedBuffer.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/TestSharedBuffer.java
index 2c7b979..4d510cf 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/TestSharedBuffer.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/TestSharedBuffer.java
@@ -245,6 +245,12 @@ public class TestSharedBuffer<V> extends SharedBuffer<V> {
stateReads++;
return iterator.next();
}
+
+ @Override
+ public void remove() {
+ stateWrites++;
+ iterator.remove();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/05ee3ce9/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateFunction.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateFunction.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateFunction.java
index 5adce4d..b125de9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateFunction.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateFunction.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.runtime.state;
import org.apache.flink.api.common.state.State;
@@ -22,8 +23,8 @@ import org.apache.flink.api.common.state.State;
/**
* A function to be applied to all keyed states.
*
- * <p>This functionality is only available through the
- * {@code BroadcastConnectedStream.process(final KeyedBroadcastProcessFunction function)}.
+ * @param <K> The type of key.
+ * @param <S> The type of state.
*/
public abstract class KeyedStateFunction<K, S extends State> {
http://git-wip-us.apache.org/repos/asf/flink/blob/05ee3ce9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index c193416..9915dd5 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -396,8 +396,6 @@ public abstract class AbstractStreamOperator<OUT>
return snapshotInProgress;
}
-
-
/**
* Stream operators with state, which want to participate in a snapshot need to override this hook method.
*