You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2019/01/09 08:13:11 UTC
[flink] 03/05: [FLINK-10596][cep] Added access to timer service in
IterativeCondition
This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 60f8826de75bdb1294ca4cd26b2af155de62c0df
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Wed Dec 12 09:17:40 2018 +0100
[FLINK-10596][cep] Added access to timer service in IterativeCondition
---
...ernStreamScalaJavaAPIInteroperabilityTest.scala | 2 +-
.../cep/functions/PatternProcessFunction.java | 2 +-
.../main/java/org/apache/flink/cep/nfa/NFA.java | 84 ++++++++++----------
.../org/apache/flink/cep/operator/CepOperator.java | 20 ++++-
.../cep/pattern/conditions/IterativeCondition.java | 5 +-
.../flink/cep/{context => time}/TimerContext.java | 6 +-
.../TimerContext.java => time/TimerService.java} | 22 ++----
.../java/org/apache/flink/cep/nfa/NFAITCase.java | 13 +++-
.../nfa/NFAIterativeConditionTimerContextTest.java | 91 ++++++++++++++++++++++
.../flink/cep/nfa/NFAStatusChangeITCase.java | 21 ++---
.../apache/flink/cep/operator/CEPOperatorTest.java | 35 +++++++++
.../org/apache/flink/cep/utils/NFATestHarness.java | 35 ++++++---
.../apache/flink/cep/utils/TestTimerService.java} | 36 ++++-----
13 files changed, 266 insertions(+), 106 deletions(-)
diff --git a/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/PatternStreamScalaJavaAPIInteroperabilityTest.scala b/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/PatternStreamScalaJavaAPIInteroperabilityTest.scala
index 5f6055d..80d96ac 100644
--- a/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/PatternStreamScalaJavaAPIInteroperabilityTest.scala
+++ b/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/PatternStreamScalaJavaAPIInteroperabilityTest.scala
@@ -150,7 +150,7 @@ class PatternStreamScalaJavaAPIInteroperabilityTest extends TestLogger {
outputs.getOrElseUpdate(outputTag, ListBuffer.empty).append(value)
}
- override def timestamp(): lang.Long = null
+ override def timestamp(): Long = 0
override def currentProcessingTime(): Long = System.currentTimeMillis()
}
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/functions/PatternProcessFunction.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/functions/PatternProcessFunction.java
index b392501..b8e6821 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/functions/PatternProcessFunction.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/functions/PatternProcessFunction.java
@@ -20,7 +20,7 @@ package org.apache.flink.cep.functions;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.functions.AbstractRichFunction;
-import org.apache.flink.cep.context.TimerContext;
+import org.apache.flink.cep.time.TimerContext;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
index 48bb587..869a641 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
@@ -38,6 +38,7 @@ import org.apache.flink.cep.nfa.sharedbuffer.NodeId;
import org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer;
import org.apache.flink.cep.nfa.sharedbuffer.SharedBufferAccessor;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
+import org.apache.flink.cep.time.TimerService;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
@@ -211,32 +212,8 @@ public class NFA<T> {
* @param nfaState The NFAState object that we need to affect while processing
* @param event The current event to be processed or null if only pruning shall be done
* @param timestamp The timestamp of the current event
- * @return Tuple of the collection of matched patterns (e.g. the result of computations which have
- * reached a final state) and the collection of timed out patterns (if timeout handling is
- * activated)
- * @throws Exception Thrown if the system cannot access the state.
- */
- public Collection<Map<String, List<T>>> process(
- final SharedBufferAccessor<T> sharedBufferAccessor,
- final NFAState nfaState,
- final T event,
- final long timestamp) throws Exception {
- return process(sharedBufferAccessor, nfaState, event, timestamp, AfterMatchSkipStrategy.noSkip());
- }
-
- /**
- * Processes the next input event. If some of the computations reach a final state then the
- * resulting event sequences are returned. If computations time out and timeout handling is
- * activated, then the timed out event patterns are returned.
- *
- * <p>If computations reach a stop state, the path forward is discarded and currently constructed path is returned
- * with the element that resulted in the stop state.
- *
- * @param sharedBufferAccessor the accessor to SharedBuffer object that we need to work upon while processing
- * @param nfaState The NFAState object that we need to affect while processing
- * @param event The current event to be processed or null if only pruning shall be done
- * @param timestamp The timestamp of the current event
* @param afterMatchSkipStrategy The skip strategy to use after per match
+ * @param timerService gives access to processing time and time characteristic, needed for condition evaluation
* @return Tuple of the collection of matched patterns (e.g. the result of computations which have
* reached a final state) and the collection of timed out patterns (if timeout handling is
* activated)
@@ -247,9 +224,10 @@ public class NFA<T> {
final NFAState nfaState,
final T event,
final long timestamp,
- final AfterMatchSkipStrategy afterMatchSkipStrategy) throws Exception {
+ final AfterMatchSkipStrategy afterMatchSkipStrategy,
+ final TimerService timerService) throws Exception {
try (EventWrapper eventWrapper = new EventWrapper(event, timestamp, sharedBufferAccessor)) {
- return doProcess(sharedBufferAccessor, nfaState, eventWrapper, afterMatchSkipStrategy);
+ return doProcess(sharedBufferAccessor, nfaState, eventWrapper, afterMatchSkipStrategy, timerService);
}
}
@@ -305,7 +283,8 @@ public class NFA<T> {
final SharedBufferAccessor<T> sharedBufferAccessor,
final NFAState nfaState,
final EventWrapper event,
- final AfterMatchSkipStrategy afterMatchSkipStrategy) throws Exception {
+ final AfterMatchSkipStrategy afterMatchSkipStrategy,
+ final TimerService timerService) throws Exception {
final PriorityQueue<ComputationState> newPartialMatches = new PriorityQueue<>(NFAState.COMPUTATION_STATE_COMPARATOR);
final PriorityQueue<ComputationState> potentialMatches = new PriorityQueue<>(NFAState.COMPUTATION_STATE_COMPARATOR);
@@ -316,7 +295,7 @@ public class NFA<T> {
sharedBufferAccessor,
computationState,
event,
- event.getTimestamp());
+ timerService);
if (newComputationStates.size() != 1) {
nfaState.setStateChanged();
@@ -555,7 +534,7 @@ public class NFA<T> {
* @param sharedBufferAccessor The accessor to shared buffer that we need to change
* @param computationState Current computation state
* @param event Current event which is processed
- * @param timestamp Timestamp of the current event
+ * @param timerService timer service which provides access to time related features
* @return Collection of computation states which result from the current one
* @throws Exception Thrown if the system cannot access the state.
*/
@@ -563,9 +542,13 @@ public class NFA<T> {
final SharedBufferAccessor<T> sharedBufferAccessor,
final ComputationState computationState,
final EventWrapper event,
- final long timestamp) throws Exception {
+ final TimerService timerService) throws Exception {
- final ConditionContext<T> context = new ConditionContext<>(this, sharedBufferAccessor, computationState);
+ final ConditionContext context = new ConditionContext(
+ sharedBufferAccessor,
+ computationState,
+ timerService,
+ event.getTimestamp());
final OutgoingEdges<T> outgoingEdges = createDecisionGraph(context, computationState, event.getEvent());
@@ -628,7 +611,7 @@ public class NFA<T> {
final long startTimestamp;
final EventId startEventId;
if (isStartState(computationState)) {
- startTimestamp = timestamp;
+ startTimestamp = event.getTimestamp();
startEventId = event.getEventId();
} else {
startTimestamp = computationState.getStartTimestamp();
@@ -694,7 +677,7 @@ public class NFA<T> {
}
private State<T> findFinalStateAfterProceed(
- ConditionContext<T> context,
+ ConditionContext context,
State<T> state,
T event) {
final Stack<State<T>> statesToCheck = new Stack<>();
@@ -725,7 +708,7 @@ public class NFA<T> {
}
private OutgoingEdges<T> createDecisionGraph(
- ConditionContext<T> context,
+ ConditionContext context,
ComputationState computationState,
T event) {
State<T> state = getState(computationState);
@@ -765,7 +748,7 @@ public class NFA<T> {
}
private boolean checkFilterCondition(
- ConditionContext<T> context,
+ ConditionContext context,
IterativeCondition<T> condition,
T event) throws Exception {
return condition == null || condition.filter(event, context);
@@ -804,7 +787,11 @@ public class NFA<T> {
/**
* The context used when evaluating this computation state.
*/
- private static class ConditionContext<T> implements IterativeCondition.Context<T> {
+ private class ConditionContext implements IterativeCondition.Context<T> {
+
+ private final TimerService timerService;
+
+ private final long eventTimestamp;
/** The current computation state. */
private ComputationState computationState;
@@ -816,17 +803,17 @@ public class NFA<T> {
*/
private Map<String, List<T>> matchedEvents;
- private NFA<T> nfa;
-
private SharedBufferAccessor<T> sharedBufferAccessor;
ConditionContext(
- final NFA<T> nfa,
final SharedBufferAccessor<T> sharedBufferAccessor,
- final ComputationState computationState) {
+ final ComputationState computationState,
+ final TimerService timerService,
+ final long eventTimestamp) {
this.computationState = computationState;
- this.nfa = nfa;
this.sharedBufferAccessor = sharedBufferAccessor;
+ this.timerService = timerService;
+ this.eventTimestamp = eventTimestamp;
}
@Override
@@ -837,7 +824,8 @@ public class NFA<T> {
// this is to avoid any overheads when using a simple, non-iterative condition.
if (matchedEvents == null) {
- this.matchedEvents = sharedBufferAccessor.materializeMatch(nfa.extractCurrentMatches(sharedBufferAccessor,
+ this.matchedEvents = sharedBufferAccessor.materializeMatch(extractCurrentMatches(
+ sharedBufferAccessor,
computationState));
}
@@ -851,6 +839,16 @@ public class NFA<T> {
}
};
}
+
+ @Override
+ public long timestamp() {
+ return eventTimestamp;
+ }
+
+ @Override
+ public long currentProcessingTime() {
+ return timerService.currentProcessingTime();
+ }
}
//////////////////// DEPRECATED/MIGRATION UTILS
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepOperator.java
index 262d89d..357fbc7 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepOperator.java
@@ -40,6 +40,7 @@ import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;
import org.apache.flink.cep.nfa.compiler.NFACompiler;
import org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer;
import org.apache.flink.cep.nfa.sharedbuffer.SharedBufferAccessor;
+import org.apache.flink.cep.time.TimerService;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.KeyedStateFunction;
import org.apache.flink.runtime.state.StateInitializationContext;
@@ -130,6 +131,9 @@ public class CepOperator<IN, KEY, OUT>
/** Wrapped RuntimeContext that limits the underlying context features. */
private transient CepRuntimeContext cepRuntimeContext;
+ /** Thin context passed to NFA that gives access to time related characteristics. */
+ private transient TimerService cepTimerService;
+
public CepOperator(
final TypeSerializer<IN> inputSerializer,
final boolean isProcessingTime,
@@ -216,6 +220,7 @@ public class CepOperator<IN, KEY, OUT>
context = new ContextFunctionImpl();
collector = new TimestampedCollector<>(output);
+ cepTimerService = new TimerServiceImpl();
}
@Override
@@ -415,7 +420,7 @@ public class CepOperator<IN, KEY, OUT>
private void processEvent(NFAState nfaState, IN event, long timestamp) throws Exception {
try (SharedBufferAccessor<IN> sharedBufferAccessor = partialMatches.getAccessor()) {
Collection<Map<String, List<IN>>> patterns =
- nfa.process(sharedBufferAccessor, nfaState, event, timestamp, afterMatchSkipStrategy);
+ nfa.process(sharedBufferAccessor, nfaState, event, timestamp, afterMatchSkipStrategy, cepTimerService);
processMatchedSequences(patterns, timestamp);
}
}
@@ -465,6 +470,19 @@ public class CepOperator<IN, KEY, OUT>
}
/**
+ * Gives {@link NFA} access to {@link InternalTimerService} and tells if {@link CepOperator} works in
+ * processing time. Should be instantiated once per operator.
+ */
+ private class TimerServiceImpl implements TimerService {
+
+ @Override
+ public long currentProcessingTime() {
+ return timerService.currentProcessingTime();
+ }
+
+ }
+
+ /**
* Implementation of {@link PatternProcessFunction.Context}. Design to be instantiated once per operator.
* It serves three methods:
* <ul>
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/IterativeCondition.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/IterativeCondition.java
index f716ec4..786cb08 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/IterativeCondition.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/IterativeCondition.java
@@ -18,7 +18,9 @@
package org.apache.flink.cep.pattern.conditions;
+import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.cep.time.TimerContext;
import java.io.Serializable;
@@ -59,6 +61,7 @@ import java.io.Serializable;
* the elements that belong to the pattern among the elements stored by the NFA. The cost of this operation can vary,
* so when implementing your condition, try to minimize the times the method is called.
*/
+@PublicEvolving
public abstract class IterativeCondition<T> implements Function, Serializable {
private static final long serialVersionUID = 7067817235759351255L;
@@ -84,7 +87,7 @@ public abstract class IterativeCondition<T> implements Function, Serializable {
/**
* The context used when evaluating the {@link IterativeCondition condition}.
*/
- public interface Context<T> {
+ public interface Context<T> extends TimerContext {
/**
* @return An {@link Iterable} over the already accepted elements
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/context/TimerContext.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/time/TimerContext.java
similarity index 91%
copy from flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/context/TimerContext.java
copy to flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/time/TimerContext.java
index 4f0eab2..9b5db52 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/context/TimerContext.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/time/TimerContext.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.flink.cep.context;
+package org.apache.flink.cep.time;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.cep.functions.PatternProcessFunction;
@@ -32,8 +32,8 @@ public interface TimerContext {
/**
* Timestamp of the element currently being processed.
*
- * <p>In case of {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime} this will be set to the
- * time when event entered the cep operator.
+ * <p>In case of {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime} this means the
+ * time when the event entered the cep operator.
*/
long timestamp();
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/context/TimerContext.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/time/TimerService.java
similarity index 58%
copy from flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/context/TimerContext.java
copy to flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/time/TimerService.java
index 4f0eab2..464e267 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/context/TimerContext.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/time/TimerService.java
@@ -16,28 +16,20 @@
* limitations under the License.
*/
-package org.apache.flink.cep.context;
+package org.apache.flink.cep.time;
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.cep.functions.PatternProcessFunction;
+import org.apache.flink.annotation.Internal;
/**
- * Enables access to time related characteristics such as current processing time or timestamp of currently processed
- * element. Used in {@link PatternProcessFunction} and
- * {@link org.apache.flink.cep.pattern.conditions.IterativeCondition}
+ * Enables to provide time characteristic to {@link org.apache.flink.cep.nfa.NFA} for use in
+ * {@link org.apache.flink.cep.pattern.conditions.IterativeCondition}.
*/
-@PublicEvolving
-public interface TimerContext {
+@Internal
+public interface TimerService {
/**
- * Timestamp of the element currently being processed.
- *
- * <p>In case of {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime} this will be set to the
- * time when event entered the cep operator.
+ * Current processing time as returned from {@link org.apache.flink.streaming.api.TimerService}.
*/
- long timestamp();
-
- /** Returns the current processing time. */
long currentProcessingTime();
}
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
index 62d76db..4b1f627 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
@@ -29,6 +29,7 @@ import org.apache.flink.cep.pattern.Quantifier;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.cep.utils.NFATestHarness;
import org.apache.flink.cep.utils.TestSharedBuffer;
+import org.apache.flink.cep.utils.TestTimerService;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.TestLogger;
@@ -420,7 +421,8 @@ public class NFAITCase extends TestLogger {
nfaState,
event.getValue(),
event.getTimestamp(),
- AfterMatchSkipStrategy.noSkip());
+ AfterMatchSkipStrategy.noSkip(),
+ new TestTimerService());
resultingPatterns.addAll(matchedPatterns);
resultingTimeoutPatterns.addAll(timeoutPatterns);
@@ -2830,10 +2832,13 @@ public class NFAITCase extends TestLogger {
Event a = new Event(40, "a", 1.0);
Event b = new Event(41, "b", 2.0);
+ NFA<Event> nfa = compile(pattern, false);
+ TestTimerService timerService = new TestTimerService();
try (SharedBufferAccessor<Event> accessor = Mockito.spy(sharedBuffer.getAccessor())) {
- NFA<Event> nfa = compile(pattern, false);
- nfa.process(accessor, nfa.createInitialNFAState(), a, 1, AfterMatchSkipStrategy.noSkip());
- nfa.process(accessor, nfa.createInitialNFAState(), b, 2, AfterMatchSkipStrategy.noSkip());
+ nfa.process(accessor, nfa.createInitialNFAState(), a, 1, AfterMatchSkipStrategy.noSkip(),
+ timerService);
+ nfa.process(accessor, nfa.createInitialNFAState(), b, 2, AfterMatchSkipStrategy.noSkip(),
+ timerService);
Mockito.verify(accessor, Mockito.never()).advanceTime(anyLong());
nfa.advanceTime(accessor, nfa.createInitialNFAState(), 2);
Mockito.verify(accessor, Mockito.times(1)).advanceTime(2);
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAIterativeConditionTimerContextTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAIterativeConditionTimerContextTest.java
new file mode 100644
index 0000000..3f9b26a
--- /dev/null
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAIterativeConditionTimerContextTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa;
+
+import org.apache.flink.cep.Event;
+import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.cep.pattern.conditions.IterativeCondition;
+import org.apache.flink.cep.utils.NFATestHarness;
+import org.apache.flink.cep.utils.TestTimerService;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.flink.cep.utils.EventBuilder.event;
+import static org.apache.flink.cep.utils.NFATestHarness.forPattern;
+import static org.apache.flink.cep.utils.NFATestUtilities.compareMaps;
+
+/**
+ * Tests for accesing time properties from {@link IterativeCondition}.
+ */
+public class NFAIterativeConditionTimerContextTest extends TestLogger {
+
+ @Test
+ public void testEventTimestamp() throws Exception {
+ final Event event = event().withId(1).build();
+ final long timestamp = 3;
+
+ final Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new IterativeCondition<Event>() {
+ @Override
+ public boolean filter(Event value, Context<Event> ctx) throws Exception {
+ return ctx.timestamp() == timestamp;
+ }
+ });
+
+ final NFATestHarness testHarness = forPattern(pattern).build();
+
+ final List<List<Event>> resultingPattern = testHarness.feedRecord(new StreamRecord<>(event, timestamp));
+
+ compareMaps(resultingPattern, Collections.singletonList(
+ Collections.singletonList(event)
+ ));
+ }
+
+ @Test
+ public void testCurrentProcessingTime() throws Exception {
+ final Event event1 = event().withId(1).build();
+ final Event event2 = event().withId(2).build();
+
+ final Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new IterativeCondition<Event>() {
+ @Override
+ public boolean filter(Event value, Context<Event> ctx) throws Exception {
+ return ctx.currentProcessingTime() == 3;
+ }
+ });
+
+ final TestTimerService cepTimerService = new TestTimerService();
+ final NFATestHarness testHarness = forPattern(pattern)
+ .withTimerService(cepTimerService)
+ .build();
+
+ cepTimerService.setCurrentProcessingTime(1);
+ final List<List<Event>> resultingPatterns1 = testHarness.feedRecord(new StreamRecord<>(event1, 7));
+ cepTimerService.setCurrentProcessingTime(3);
+ final List<List<Event>> resultingPatterns2 = testHarness.feedRecord(new StreamRecord<>(event2, 8));
+
+ compareMaps(resultingPatterns1, Collections.emptyList());
+ compareMaps(resultingPatterns2, Collections.singletonList(
+ Collections.singletonList(event2)
+ ));
+ }
+}
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAStatusChangeITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAStatusChangeITCase.java
index d0a7853..c5e6682 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAStatusChangeITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAStatusChangeITCase.java
@@ -26,7 +26,9 @@ import org.apache.flink.cep.nfa.sharedbuffer.SharedBufferAccessor;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
+import org.apache.flink.cep.time.TimerService;
import org.apache.flink.cep.utils.TestSharedBuffer;
+import org.apache.flink.cep.utils.TestTimerService;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.junit.After;
@@ -49,6 +51,7 @@ public class NFAStatusChangeITCase {
private SharedBuffer<Event> sharedBuffer;
private SharedBufferAccessor<Event> sharedBufferAccessor;
private AfterMatchSkipStrategy skipStrategy = AfterMatchSkipStrategy.noSkip();
+ private TimerService timerService = new TestTimerService();
@Before
public void init() {
@@ -97,32 +100,32 @@ public class NFAStatusChangeITCase {
NFAState nfaState = nfa.createInitialNFAState();
- nfa.process(sharedBufferAccessor, nfaState, new Event(1, "b", 1.0), 1L, skipStrategy);
+ nfa.process(sharedBufferAccessor, nfaState, new Event(1, "b", 1.0), 1L, skipStrategy, timerService);
assertFalse("NFA status should not change as the event does not match the take condition of the 'start' state", nfaState.isStateChanged());
nfaState.resetStateChanged();
- nfa.process(sharedBufferAccessor, nfaState, new Event(2, "a", 1.0), 2L, skipStrategy);
+ nfa.process(sharedBufferAccessor, nfaState, new Event(2, "a", 1.0), 2L, skipStrategy, timerService);
assertTrue("NFA status should change as the event matches the take condition of the 'start' state", nfaState.isStateChanged());
// the status of the queue of ComputationStatus changed,
// more than one ComputationStatus is generated by the event from some ComputationStatus
nfaState.resetStateChanged();
- nfa.process(sharedBufferAccessor, nfaState, new Event(3, "f", 1.0), 3L, skipStrategy);
+ nfa.process(sharedBufferAccessor, nfaState, new Event(3, "f", 1.0), 3L, skipStrategy, timerService);
assertTrue("NFA status should change as the event matches the ignore condition and proceed condition of the 'middle:1' state", nfaState.isStateChanged());
// both the queue of ComputationStatus and eventSharedBuffer have not changed
nfaState.resetStateChanged();
- nfa.process(sharedBufferAccessor, nfaState, new Event(4, "f", 1.0), 4L, skipStrategy);
+ nfa.process(sharedBufferAccessor, nfaState, new Event(4, "f", 1.0), 4L, skipStrategy, timerService);
assertFalse("NFA status should not change as the event only matches the ignore condition of the 'middle:2' state and the target state is still 'middle:2'", nfaState.isStateChanged());
// both the queue of ComputationStatus and eventSharedBuffer have changed
nfaState.resetStateChanged();
- nfa.process(sharedBufferAccessor, nfaState, new Event(5, "b", 1.0), 5L, skipStrategy);
+ nfa.process(sharedBufferAccessor, nfaState, new Event(5, "b", 1.0), 5L, skipStrategy, timerService);
assertTrue("NFA status should change as the event matches the take condition of 'middle:2' state", nfaState.isStateChanged());
// both the queue of ComputationStatus and eventSharedBuffer have changed
nfaState.resetStateChanged();
- nfa.process(sharedBufferAccessor, nfaState, new Event(6, "d", 1.0), 6L, skipStrategy);
+ nfa.process(sharedBufferAccessor, nfaState, new Event(6, "d", 1.0), 6L, skipStrategy, timerService);
assertTrue("NFA status should change as the event matches the take condition of 'middle2' state", nfaState.isStateChanged());
// both the queue of ComputationStatus and eventSharedBuffer have not changed
@@ -166,10 +169,10 @@ public class NFAStatusChangeITCase {
NFAState nfaState = nfa.createInitialNFAState();
nfaState.resetStateChanged();
- nfa.process(sharedBufferAccessor, nfaState, new Event(6, "start", 1.0), 6L, skipStrategy);
+ nfa.process(sharedBufferAccessor, nfaState, new Event(6, "start", 1.0), 6L, skipStrategy, timerService);
nfaState.resetStateChanged();
- nfa.process(sharedBufferAccessor, nfaState, new Event(6, "a", 1.0), 7L, skipStrategy);
+ nfa.process(sharedBufferAccessor, nfaState, new Event(6, "a", 1.0), 7L, skipStrategy, timerService);
assertTrue(nfaState.isStateChanged());
}
@@ -195,7 +198,7 @@ public class NFAStatusChangeITCase {
nfaState.resetStateChanged();
nfa.advanceTime(sharedBufferAccessor, nfaState, 6L);
- nfa.process(sharedBufferAccessor, nfaState, new Event(6, "start", 1.0), 6L, skipStrategy);
+ nfa.process(sharedBufferAccessor, nfaState, new Event(6, "start", 1.0), 6L, skipStrategy, timerService);
nfaState.resetStateChanged();
nfa.advanceTime(sharedBufferAccessor, nfaState, 17L);
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
index 2e4b41d..f496dc8 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
@@ -30,10 +30,14 @@ import org.apache.flink.cep.SubEvent;
import org.apache.flink.cep.functions.PatternProcessFunction;
import org.apache.flink.cep.functions.TimedOutPartialMatchHandler;
import org.apache.flink.cep.nfa.NFA;
+import org.apache.flink.cep.nfa.NFAState;
+import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;
import org.apache.flink.cep.nfa.compiler.NFACompiler;
+import org.apache.flink.cep.nfa.sharedbuffer.SharedBufferAccessor;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
+import org.apache.flink.cep.time.TimerService;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.mock.Whitebox;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
@@ -64,10 +68,16 @@ import java.util.List;
import java.util.Map;
import java.util.Queue;
+import static org.apache.flink.cep.utils.CepOperatorBuilder.createOperatorForNFA;
+import static org.apache.flink.cep.utils.EventBuilder.event;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.validateMockitoUsage;
+import static org.mockito.Mockito.verify;
/**
* Tests for {@link CepOperator}.
@@ -101,6 +111,31 @@ public class CEPOperatorTest extends TestLogger {
}
@Test
+ public void testProcessingTimestampisPassedToNFA() throws Exception {
+
+ final NFA<Event> nfa = NFACompiler.compileFactory(Pattern.<Event>begin("begin"), true).createNFA();
+ final NFA<Event> spyNFA = spy(nfa);
+
+ try (
+ OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness =
+ CepOperatorTestUtilities.getCepTestHarness(createOperatorForNFA(spyNFA).build())) {
+
+ long timestamp = 5;
+ harness.open();
+ harness.setProcessingTime(timestamp);
+ StreamRecord<Event> event = event().withTimestamp(3).asStreamRecord();
+ harness.processElement(event);
+ verify(spyNFA).process(
+ any(SharedBufferAccessor.class),
+ any(NFAState.class),
+ eq(event.getValue()),
+ eq(timestamp),
+ any(AfterMatchSkipStrategy.class),
+ any(TimerService.class));
+ }
+ }
+
+ @Test
public void testKeyedCEPOperatorCheckpointing() throws Exception {
OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = getCepTestHarness(false);
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/NFATestHarness.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/NFATestHarness.java
index dd080fb..f8e0f57 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/NFATestHarness.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/NFATestHarness.java
@@ -25,6 +25,7 @@ import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;
import org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer;
import org.apache.flink.cep.nfa.sharedbuffer.SharedBufferAccessor;
import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.cep.time.TimerService;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import java.util.ArrayList;
@@ -41,16 +42,19 @@ public final class NFATestHarness {
private final NFA<Event> nfa;
private final NFAState nfaState;
private final AfterMatchSkipStrategy afterMatchSkipStrategy;
+ private final TimerService timerService;
private NFATestHarness(
SharedBuffer<Event> sharedBuffer,
NFA<Event> nfa,
NFAState nfaState,
- AfterMatchSkipStrategy afterMatchSkipStrategy) {
+ AfterMatchSkipStrategy afterMatchSkipStrategy,
+ TimerService timerService) {
this.sharedBuffer = sharedBuffer;
this.nfa = nfa;
this.nfaState = nfaState;
this.afterMatchSkipStrategy = afterMatchSkipStrategy;
+ this.timerService = timerService;
}
/**
@@ -68,7 +72,7 @@ public final class NFATestHarness {
}
public List<List<Event>> feedRecords(List<StreamRecord<Event>> inputEvents) throws Exception {
- List<List<Event>> resultingPatterns = new ArrayList<>();
+ final List<List<Event>> resultingPatterns = new ArrayList<>();
for (StreamRecord<Event> inputEvent : inputEvents) {
resultingPatterns.addAll(feedRecord(inputEvent));
}
@@ -76,8 +80,8 @@ public final class NFATestHarness {
}
public List<List<Event>> feedRecord(StreamRecord<Event> inputEvent) throws Exception {
- List<List<Event>> resultingPatterns = new ArrayList<>();
- Collection<Map<String, List<Event>>> matches = consumeRecord(inputEvent);
+ final List<List<Event>> resultingPatterns = new ArrayList<>();
+ final Collection<Map<String, List<Event>>> matches = consumeRecord(inputEvent);
for (Map<String, List<Event>> p : matches) {
List<Event> res = new ArrayList<>();
for (List<Event> le : p.values()) {
@@ -89,7 +93,7 @@ public final class NFATestHarness {
}
public Collection<Map<String, List<Event>>> consumeRecords(Collection<StreamRecord<Event>> inputEvents) throws Exception {
- List<Map<String, List<Event>>> resultingPatterns = new ArrayList<>();
+ final List<Map<String, List<Event>>> resultingPatterns = new ArrayList<>();
for (StreamRecord<Event> inputEvent : inputEvents) {
resultingPatterns.addAll(consumeRecord(inputEvent));
}
@@ -105,7 +109,8 @@ public final class NFATestHarness {
nfaState,
inputEvent.getValue(),
inputEvent.getTimestamp(),
- afterMatchSkipStrategy);
+ afterMatchSkipStrategy,
+ timerService);
}
}
@@ -129,12 +134,13 @@ public final class NFATestHarness {
@Override
public NFATestHarness build() {
- NFA<Event> nfa = NFAUtils.compile(pattern, timeoutHandling);
+ final NFA<Event> nfa = NFAUtils.compile(pattern, timeoutHandling);
return new NFATestHarness(
sharedBuffer,
nfa,
nfa.createInitialNFAState(),
- afterMatchSkipStrategy);
+ afterMatchSkipStrategy,
+ timerService);
}
}
@@ -159,7 +165,12 @@ public final class NFATestHarness {
@Override
public NFATestHarness build() {
- return new NFATestHarness(sharedBuffer, nfa, nfaState, afterMatchSkipStrategy);
+ return new NFATestHarness(
+ sharedBuffer,
+ nfa,
+ nfaState,
+ afterMatchSkipStrategy,
+ timerService);
}
}
@@ -171,6 +182,7 @@ public final class NFATestHarness {
SharedBuffer<Event> sharedBuffer = TestSharedBuffer.createTestBuffer(Event.createTypeSerializer());
AfterMatchSkipStrategy afterMatchSkipStrategy;
+ TimerService timerService = new TestTimerService();
NFATestHarnessBuilderBase(AfterMatchSkipStrategy skipStrategy) {
this.afterMatchSkipStrategy = skipStrategy;
@@ -186,6 +198,11 @@ public final class NFATestHarness {
return this;
}
+ public NFATestHarnessBuilderBase withTimerService(TimerService timerService) {
+ this.timerService = timerService;
+ return this;
+ }
+
public abstract NFATestHarness build();
}
}
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/context/TimerContext.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/TestTimerService.java
similarity index 51%
rename from flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/context/TimerContext.java
rename to flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/TestTimerService.java
index 4f0eab2..5fd9755 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/context/TimerContext.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/TestTimerService.java
@@ -16,28 +16,26 @@
* limitations under the License.
*/
-package org.apache.flink.cep.context;
+package org.apache.flink.cep.utils;
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.cep.functions.PatternProcessFunction;
+import org.apache.flink.cep.time.TimerService;
/**
- * Enables access to time related characteristics such as current processing time or timestamp of currently processed
- * element. Used in {@link PatternProcessFunction} and
- * {@link org.apache.flink.cep.pattern.conditions.IterativeCondition}
+ * Test implementation of {@link TimerService}. Provides setters for encapsulated properties.
*/
-@PublicEvolving
-public interface TimerContext {
-
- /**
- * Timestamp of the element currently being processed.
- *
- * <p>In case of {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime} this will be set to the
- * time when event entered the cep operator.
- */
- long timestamp();
-
- /** Returns the current processing time. */
- long currentProcessingTime();
+public class TestTimerService implements TimerService {
+
+ public static final long UNDEFINED_TIME = Long.MIN_VALUE;
+
+ private Long processingTime = UNDEFINED_TIME;
+
+ @Override
+ public long currentProcessingTime() {
+ return processingTime;
+ }
+
+ public void setCurrentProcessingTime(long processingTime) {
+ this.processingTime = processingTime;
+ }
}