You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/11/10 13:44:29 UTC

flink git commit: [FLINK-5033] [cep] Advance time with incoming watermarks at CEP operator

Repository: flink
Updated Branches:
  refs/heads/release-1.1 290f8a25f -> 3eb563332


[FLINK-5033] [cep] Advance time with incoming watermarks at CEP operator

Before the time was only advanced if the CEP had some events buffered. If the priority queue
was empty, then an incoming watermark did not advance the time. This led to missing timeouts
and pruning possibilities. The PR fixes this problem.

This closes #2771.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3eb56333
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3eb56333
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3eb56333

Branch: refs/heads/release-1.1
Commit: 3eb563332cce5f96fbc5f093adec22d6d1a983bf
Parents: 290f8a2
Author: Till Rohrmann <tr...@apache.org>
Authored: Tue Nov 8 15:14:36 2016 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Nov 10 14:28:30 2016 +0100

----------------------------------------------------------------------
 .../main/java/org/apache/flink/cep/nfa/NFA.java |  10 +-
 .../AbstractCEPBasePatternOperator.java         |   9 ++
 .../operator/AbstractCEPPatternOperator.java    |  12 +-
 .../AbstractKeyedCEPPatternOperator.java        |  11 +-
 .../flink/cep/operator/CEPPatternOperator.java  |  22 ++-
 .../cep/operator/KeyedCEPPatternOperator.java   |  22 ++-
 .../cep/operator/TimeoutCEPPatternOperator.java |  35 ++--
 .../TimeoutKeyedCEPPatternOperator.java         |  39 +++--
 .../flink/cep/operator/CEPOperatorTest.java     | 161 ++++++++++++++++++-
 .../util/OneInputStreamOperatorTestHarness.java |  14 +-
 10 files changed, 291 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3eb56333/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
index 624db0d..3bb9088 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
@@ -123,7 +123,7 @@ public class NFA<T> implements Serializable {
 	 * resulting event sequences are returned. If computations time out and timeout handling is
 	 * activated, then the timed out event patterns are returned.
 	 *
-	 * @param event The current event to be processed
+	 * @param event The current event to be processed or null if only pruning shall be done
 	 * @param timestamp The timestamp of the current event
 	 * @return Tuple of the collection of matched patterns (e.g. the result of computations which have
 	 * reached a final state) and the collection of timed out patterns (if timeout handling is
@@ -141,7 +141,7 @@ public class NFA<T> implements Serializable {
 			final Collection<ComputationState<T>> newComputationStates;
 
 			if (!computationState.isStartState() &&
-				windowTime > 0 &&
+				windowTime > 0L &&
 				timestamp - computationState.getStartTimestamp() >= windowTime) {
 
 				if (handleTimeout) {
@@ -158,8 +158,10 @@ public class NFA<T> implements Serializable {
 				sharedBuffer.remove(computationState.getState(), computationState.getEvent(), computationState.getTimestamp());
 
 				newComputationStates = Collections.emptyList();
-			} else {
+			} else if (event != null) {
 				newComputationStates = computeNextStates(computationState, event, timestamp);
+			} else {
+				newComputationStates = Collections.singleton(computationState);
 			}
 
 			for (ComputationState<T> newComputationState: newComputationStates) {
@@ -179,7 +181,7 @@ public class NFA<T> implements Serializable {
 		}
 
 		// prune shared buffer based on window length
-		if(windowTime > 0) {
+		if(windowTime > 0L) {
 			long pruningTimestamp = timestamp - windowTime;
 
 			// sanity check to guard against underflows

http://git-wip-us.apache.org/repos/asf/flink/blob/3eb56333/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPBasePatternOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPBasePatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPBasePatternOperator.java
index aad408c..4ead4be 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPBasePatternOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPBasePatternOperator.java
@@ -95,4 +95,13 @@ public abstract class AbstractCEPBasePatternOperator<IN, OUT>
 	 * @param timestamp The timestamp of the event
 	 */
 	protected abstract void processEvent(NFA<IN> nfa, IN event, long timestamp);
+
+	/**
+	 * Advances the time for the given NFA to the given timestamp. This can lead to pruning and
+	 * timeouts.
+	 *
+	 * @param nfa to advance the time for
+	 * @param timestamp to advance the time to
+	 */
+	protected abstract void advanceTime(NFA<IN> nfa, long timestamp);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3eb56333/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java
index c26eaeb..b523f46 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java
@@ -94,10 +94,14 @@ abstract public class AbstractCEPPatternOperator<IN, OUT> extends AbstractCEPBas
 
 	@Override
 	public void processWatermark(Watermark mark) throws Exception {
-		while(!priorityQueue.isEmpty() && priorityQueue.peek().getTimestamp() <= mark.getTimestamp()) {
-			StreamRecord<IN> streamRecord = priorityQueue.poll();
-
-			processEvent(nfa, streamRecord.getValue(), streamRecord.getTimestamp());
+		if (priorityQueue.isEmpty()) {
+			advanceTime(nfa, mark.getTimestamp());
+		} else {
+			while (!priorityQueue.isEmpty() && priorityQueue.peek().getTimestamp() <= mark.getTimestamp()) {
+				StreamRecord<IN> streamRecord = priorityQueue.poll();
+
+				processEvent(nfa, streamRecord.getValue(), streamRecord.getTimestamp());
+			}
 		}
 
 		output.emitWatermark(mark);

http://git-wip-us.apache.org/repos/asf/flink/blob/3eb56333/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
index 206be47..03e40ac 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
@@ -170,11 +170,16 @@ abstract public class AbstractKeyedCEPPatternOperator<IN, KEY, OUT> extends Abst
 			PriorityQueue<StreamRecord<IN>> priorityQueue = getPriorityQueue();
 			NFA<IN> nfa = getNFA();
 
-			while (!priorityQueue.isEmpty() && priorityQueue.peek().getTimestamp() <= mark.getTimestamp()) {
-				StreamRecord<IN> streamRecord = priorityQueue.poll();
+			if (priorityQueue.isEmpty()) {
+					advanceTime(nfa, mark.getTimestamp());
+			} else {
+				while (!priorityQueue.isEmpty() && priorityQueue.peek().getTimestamp() <= mark.getTimestamp()) {
+					StreamRecord<IN> streamRecord = priorityQueue.poll();
 
-				processEvent(nfa, streamRecord.getValue(), streamRecord.getTimestamp());
+					processEvent(nfa, streamRecord.getValue(), streamRecord.getTimestamp());
+				}
 			}
+
 			updateNFA(nfa);
 			updatePriorityQueue(priorityQueue);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/3eb56333/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPPatternOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPPatternOperator.java
index 561697d..57f27c2 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPPatternOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPPatternOperator.java
@@ -25,6 +25,7 @@ import org.apache.flink.cep.nfa.compiler.NFACompiler;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
 import java.util.Collection;
+import java.util.Iterator;
 import java.util.Map;
 
 /**
@@ -48,15 +49,28 @@ public class CEPPatternOperator<IN> extends AbstractCEPPatternOperator<IN, Map<S
 
 		Collection<Map<String, IN>> matchedPatterns = patterns.f0;
 
-		if (!matchedPatterns.isEmpty()) {
+		emitMatchedSequences(matchedPatterns, timestamp);
+	}
+
+	@Override
+	protected void advanceTime(NFA<IN> nfa, long timestamp) {
+		Tuple2<Collection<Map<String, IN>>, Collection<Tuple2<Map<String, IN>, Long>>> patterns = nfa.process(null, timestamp);
+
+		emitMatchedSequences(patterns.f0, timestamp);
+	}
+
+	private void emitMatchedSequences(Iterable<Map<String, IN>> matchedSequences, long timestamp) {
+		Iterator<Map<String, IN>> iterator = matchedSequences.iterator();
+
+		if (iterator.hasNext()) {
 			StreamRecord<Map<String, IN>> streamRecord = new StreamRecord<Map<String, IN>>(
 				null,
 				timestamp);
 
-			for (Map<String, IN> pattern: matchedPatterns) {
-				streamRecord.replace(pattern);
+			do {
+				streamRecord.replace(iterator.next());
 				output.collect(streamRecord);
-			}
+			} while (iterator.hasNext());
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3eb56333/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java
index 62d82d9..4d8a907 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java
@@ -26,6 +26,7 @@ import org.apache.flink.cep.nfa.compiler.NFACompiler;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
 import java.util.Collection;
+import java.util.Iterator;
 import java.util.Map;
 
 /**
@@ -51,15 +52,28 @@ public class KeyedCEPPatternOperator<IN, KEY> extends AbstractKeyedCEPPatternOpe
 
 		Collection<Map<String, IN>> matchedPatterns = patterns.f0;
 
-		if (!matchedPatterns.isEmpty()) {
+		emitMatchedSequences(matchedPatterns, timestamp);
+	}
+
+	@Override
+	protected void advanceTime(NFA<IN> nfa, long timestamp) {
+		Tuple2<Collection<Map<String, IN>>, Collection<Tuple2<Map<String, IN>, Long>>> patterns = nfa.process(null, timestamp);
+
+		emitMatchedSequences(patterns.f0, timestamp);
+	}
+
+	private void emitMatchedSequences(Iterable<Map<String, IN>> matchedSequences, long timestamp) {
+		Iterator<Map<String, IN>> iterator = matchedSequences.iterator();
+
+		if (iterator.hasNext()) {
 			StreamRecord<Map<String, IN>> streamRecord = new StreamRecord<Map<String, IN>>(
 				null,
 				timestamp);
 
-			for (Map<String, IN> pattern: matchedPatterns) {
-				streamRecord.replace(pattern);
+			do {
+				streamRecord.replace(iterator.next());
 				output.collect(streamRecord);
-			}
+			} while (iterator.hasNext());
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3eb56333/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutCEPPatternOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutCEPPatternOperator.java
index 9b0c951..9a04468 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutCEPPatternOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutCEPPatternOperator.java
@@ -58,22 +58,37 @@ public class TimeoutCEPPatternOperator<IN> extends AbstractCEPPatternOperator<IN
 		Collection<Map<String, IN>> matchedPatterns = patterns.f0;
 		Collection<Tuple2<Map<String, IN>, Long>> partialPatterns = patterns.f1;
 
+		emitMatchedSequences(matchedPatterns, timestamp);
+		emitTimedOutSequences(partialPatterns, timestamp);
+	}
+
+	@Override
+	protected void advanceTime(NFA<IN> nfa, long timestamp) {
+		Tuple2<Collection<Map<String, IN>>, Collection<Tuple2<Map<String, IN>, Long>>> patterns = nfa.process(null, timestamp);
+
+		emitMatchedSequences(patterns.f0, timestamp);
+		emitTimedOutSequences(patterns.f1, timestamp);
+	}
+
+	private void emitTimedOutSequences(Iterable<Tuple2<Map<String, IN>, Long>> timedOutSequences, long timestamp) {
 		StreamRecord<Either<Tuple2<Map<String, IN>, Long>, Map<String, IN>>> streamRecord = new StreamRecord<Either<Tuple2<Map<String, IN>, Long>, Map<String, IN>>>(
 			null,
 			timestamp);
 
-		if (!matchedPatterns.isEmpty()) {
-			for (Map<String, IN> matchedPattern : matchedPatterns) {
-				streamRecord.replace(Either.Right(matchedPattern));
-				output.collect(streamRecord);
-			}
+		for (Tuple2<Map<String, IN>, Long> partialPattern: timedOutSequences) {
+			streamRecord.replace(Either.Left(partialPattern));
+			output.collect(streamRecord);
 		}
+	}
+
+	protected void emitMatchedSequences(Iterable<Map<String, IN>> matchedSequences, long timestamp) {
+		StreamRecord<Either<Tuple2<Map<String, IN>, Long>, Map<String, IN>>> streamRecord = new StreamRecord<Either<Tuple2<Map<String, IN>, Long>, Map<String, IN>>>(
+			null,
+			timestamp);
 
-		if (!partialPatterns.isEmpty()) {
-			for (Tuple2<Map<String, IN>, Long> partialPattern: partialPatterns) {
-				streamRecord.replace(Either.Left(partialPattern));
-				output.collect(streamRecord);
-			}
+		for (Map<String, IN> matchedPattern : matchedSequences) {
+			streamRecord.replace(Either.Right(matchedPattern));
+			output.collect(streamRecord);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3eb56333/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java
index 23b1a91..4d33435 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java
@@ -48,25 +48,40 @@ public class TimeoutKeyedCEPPatternOperator<IN, KEY> extends AbstractKeyedCEPPat
 			event,
 			timestamp);
 
-		Collection<Map<String, IN>> matchedPatterns = patterns.f0;
-		Collection<Tuple2<Map<String, IN>, Long>> partialPatterns = patterns.f1;
+		Collection<Map<String, IN>> matchedSequences = patterns.f0;
+		Collection<Tuple2<Map<String, IN>, Long>> timedOutSequences = patterns.f1;
 
+		emitMatchedSequences(matchedSequences, timestamp);
+		emitTimedOutSequences(timedOutSequences, timestamp);
+	}
+
+	@Override
+	protected void advanceTime(NFA<IN> nfa, long timestamp) {
+		Tuple2<Collection<Map<String, IN>>, Collection<Tuple2<Map<String, IN>, Long>>> patterns = nfa.process(null, timestamp);
+
+		emitMatchedSequences(patterns.f0, timestamp);
+		emitTimedOutSequences(patterns.f1, timestamp);
+	}
+
+	private void emitTimedOutSequences(Iterable<Tuple2<Map<String, IN>, Long>> timedOutSequences, long timestamp) {
 		StreamRecord<Either<Tuple2<Map<String, IN>, Long>, Map<String, IN>>> streamRecord = new StreamRecord<Either<Tuple2<Map<String, IN>, Long>, Map<String, IN>>>(
 			null,
 			timestamp);
 
-		if (!matchedPatterns.isEmpty()) {
-			for (Map<String, IN> matchedPattern : matchedPatterns) {
-				streamRecord.replace(Either.Right(matchedPattern));
-				output.collect(streamRecord);
-			}
+		for (Tuple2<Map<String, IN>, Long> partialPattern: timedOutSequences) {
+			streamRecord.replace(Either.Left(partialPattern));
+			output.collect(streamRecord);
 		}
+	}
+
+	protected void emitMatchedSequences(Iterable<Map<String, IN>> matchedSequences, long timestamp) {
+		StreamRecord<Either<Tuple2<Map<String, IN>, Long>, Map<String, IN>>> streamRecord = new StreamRecord<Either<Tuple2<Map<String, IN>, Long>, Map<String, IN>>>(
+			null,
+			timestamp);
 
-		if (!partialPatterns.isEmpty()) {
-			for (Tuple2<Map<String, IN>, Long> partialPattern: partialPatterns) {
-				streamRecord.replace(Either.Left(partialPattern));
-				output.collect(streamRecord);
-			}
+		for (Map<String, IN> matchedPattern : matchedSequences) {
+			streamRecord.replace(Either.Right(matchedPattern));
+			output.collect(streamRecord);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3eb56333/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
index 56c4161..60db7a3 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
@@ -18,10 +18,13 @@
 
 package org.apache.flink.cep.operator;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
 import org.apache.flink.cep.Event;
 import org.apache.flink.cep.SubEvent;
 import org.apache.flink.cep.nfa.NFA;
@@ -34,6 +37,7 @@ import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.types.Either;
 import org.apache.flink.util.TestLogger;
 import org.junit.Rule;
 import org.junit.Test;
@@ -41,7 +45,9 @@ import org.junit.rules.TemporaryFolder;
 
 import static org.junit.Assert.*;
 
+import java.util.HashMap;
 import java.util.Map;
+import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
 public class CEPOperatorTest extends TestLogger {
@@ -403,10 +409,161 @@ public class CEPOperatorTest extends TestLogger {
 		harness.close();
 	}
 
+	/**
+	 * Tests that the internal time of a CEP operator advances only given watermarks. See FLINK-5033
+	 */
+	@Test
+	public void testKeyedAdvancingTimeWithoutElements() throws Exception {
+		final KeySelector<Event, Integer> keySelector = new KeySelector<Event, Integer>() {
+			private static final long serialVersionUID = -4873366487571254798L;
+
+			@Override
+			public Integer getKey(Event value) throws Exception {
+				return value.getId();
+			}
+		};
+		final Event startEvent = new Event(42, "start", 1.0);
+		final long watermarkTimestamp1 = 5L;
+		final long watermarkTimestamp2 = 13L;
+
+		final Map<String, Event> expectedSequence = new HashMap<>(2);
+		expectedSequence.put("start", startEvent);
+
+		OneInputStreamOperatorTestHarness<Event, Either<Tuple2<Map<String, Event>, Long>, Map<String, Event>>> harness = new OneInputStreamOperatorTestHarness<>(
+			new TimeoutKeyedCEPPatternOperator<>(
+				Event.createTypeSerializer(),
+				false,
+				keySelector,
+				IntSerializer.INSTANCE,
+				new NFAFactory(true)));
+
+		harness.configureForKeyedStream(keySelector, BasicTypeInfo.INT_TYPE_INFO);
+
+		try {
+			harness.setup(
+				new KryoSerializer<>(
+					(Class<Either<Tuple2<Map<String, Event>, Long>, Map<String, Event>>>) (Object) Either.class,
+					new ExecutionConfig()));
+			harness.open();
+
+			harness.processElement(new StreamRecord<>(startEvent, 3L));
+			harness.processWatermark(new Watermark(watermarkTimestamp1));
+			harness.processWatermark(new Watermark(watermarkTimestamp2));
+
+			Queue<Object> result = harness.getOutput();
+
+			assertEquals(3, result.size());
+
+			Object watermark1 = result.poll();
+
+			assertTrue(watermark1 instanceof Watermark);
+
+			assertEquals(watermarkTimestamp1, ((Watermark) watermark1).getTimestamp());
+
+			Object resultObject = result.poll();
+
+			assertTrue(resultObject instanceof StreamRecord);
+
+			StreamRecord<Either<Tuple2<Map<String, Event>, Long>, Map<String, Event>>> streamRecord = (StreamRecord<Either<Tuple2<Map<String,Event>,Long>,Map<String,Event>>>) resultObject;
+
+			assertTrue(streamRecord.getValue() instanceof Either.Left);
+
+			Either.Left<Tuple2<Map<String, Event>, Long>, Map<String, Event>> left = (Either.Left<Tuple2<Map<String, Event>, Long>, Map<String, Event>>) streamRecord.getValue();
+
+			Tuple2<Map<String, Event>, Long> leftResult = left.left();
+
+			assertEquals(watermarkTimestamp2, (long) leftResult.f1);
+			assertEquals(expectedSequence, leftResult.f0);
+
+			Object watermark2 = result.poll();
+
+			assertTrue(watermark2 instanceof Watermark);
+
+			assertEquals(watermarkTimestamp2, ((Watermark) watermark2).getTimestamp());
+		} finally {
+			harness.close();
+		}
+	}
+
+	/**
+	 * Tests that the internal time of a CEP operator advances only given watermarks. See FLINK-5033
+	 */
+	@Test
+	public void testAdvancingTimeWithoutElements() throws Exception {
+		final Event startEvent = new Event(42, "start", 1.0);
+		final long watermarkTimestamp1 = 5L;
+		final long watermarkTimestamp2 = 13L;
+
+		final Map<String, Event> expectedSequence = new HashMap<>(2);
+		expectedSequence.put("start", startEvent);
+
+		OneInputStreamOperatorTestHarness<Event, Either<Tuple2<Map<String, Event>, Long>, Map<String, Event>>> harness = new OneInputStreamOperatorTestHarness<>(
+			new TimeoutCEPPatternOperator<>(
+				Event.createTypeSerializer(),
+				false,
+				new NFAFactory(true))
+		);
+
+		try {
+			harness.setup(
+				new KryoSerializer<>(
+					(Class<Either<Tuple2<Map<String, Event>, Long>, Map<String, Event>>>) (Object) Either.class,
+					new ExecutionConfig()));
+			harness.open();
+
+			harness.processElement(new StreamRecord<>(startEvent, 3L));
+			harness.processWatermark(new Watermark(watermarkTimestamp1));
+			harness.processWatermark(new Watermark(watermarkTimestamp2));
+
+			Queue<Object> result = harness.getOutput();
+
+			assertEquals(3, result.size());
+
+			Object watermark1 = result.poll();
+
+			assertTrue(watermark1 instanceof Watermark);
+
+			assertEquals(watermarkTimestamp1, ((Watermark) watermark1).getTimestamp());
+
+			Object resultObject = result.poll();
+
+			assertTrue(resultObject instanceof StreamRecord);
+
+			StreamRecord<Either<Tuple2<Map<String, Event>, Long>, Map<String, Event>>> streamRecord = (StreamRecord<Either<Tuple2<Map<String,Event>,Long>,Map<String,Event>>>) resultObject;
+
+			assertTrue(streamRecord.getValue() instanceof Either.Left);
+
+			Either.Left<Tuple2<Map<String, Event>, Long>, Map<String, Event>> left = (Either.Left<Tuple2<Map<String, Event>, Long>, Map<String, Event>>) streamRecord.getValue();
+
+			Tuple2<Map<String, Event>, Long> leftResult = left.left();
+
+			assertEquals(watermarkTimestamp2, (long) leftResult.f1);
+			assertEquals(expectedSequence, leftResult.f0);
+
+			Object watermark2 = result.poll();
+
+			assertTrue(watermark2 instanceof Watermark);
+
+			assertEquals(watermarkTimestamp2, ((Watermark) watermark2).getTimestamp());
+		} finally {
+			harness.close();
+		}
+	}
+
 	private static class NFAFactory implements NFACompiler.NFAFactory<Event> {
 
 		private static final long serialVersionUID = 1173020762472766713L;
 
+		private final boolean handleTimeout;
+
+		private NFAFactory() {
+			this(false);
+		}
+
+		private NFAFactory(boolean handleTimeout) {
+			this.handleTimeout = handleTimeout;
+		}
+
 		@Override
 		public NFA<Event> createNFA() {
 
@@ -436,9 +593,9 @@ public class CEPOperatorTest extends TestLogger {
 					})
 					// add a window timeout to test whether timestamps of elements in the
 					// priority queue in CEP operator are correctly checkpointed/restored
-					.within(Time.milliseconds(10));
+					.within(Time.milliseconds(10L));
 
-			return NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+			return NFACompiler.compile(pattern, Event.createTypeSerializer(), handleTimeout);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3eb56333/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
index 5708639..12acfb6 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
@@ -197,7 +197,15 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
 	 * {@link org.apache.flink.streaming.api.operators.StreamOperator#setup(StreamTask, StreamConfig, Output)} ()}
 	 */
 	public void setup() throws Exception {
-		operator.setup(mockTask, config, new MockOutput());
+		setup(null);
+	}
+
+	/**
+	 * Calls
+	 * {@link org.apache.flink.streaming.api.operators.StreamOperator#setup(StreamTask, StreamConfig, Output)} ()}
+	 */
+	public void setup(TypeSerializer<OUT> outputTypeSerializer) {
+		operator.setup(mockTask, config, new MockOutput(outputTypeSerializer));
 		setupCalled = true;
 	}
 
@@ -282,6 +290,10 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
 
 		private TypeSerializer<OUT> outputSerializer;
 
+		public MockOutput(TypeSerializer<OUT> outputSerializer) {
+			this.outputSerializer = outputSerializer;
+		}
+
 		@Override
 		public void emitWatermark(Watermark mark) {
 			outputList.add(mark);