You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2017/02/24 10:12:35 UTC

flink git commit: [FLINK-5845] [cep] Unify keyed and non-keyed operators.

Repository: flink
Updated Branches:
  refs/heads/master dd8ef550c -> 15ae922ad


[FLINK-5845] [cep] Unify keyed and non-keyed operators.

Now all cep operators are keyed, and for the non-keyed
usecases, we key on a dummy key and use the keyed operator.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/15ae922a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/15ae922a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/15ae922a

Branch: refs/heads/master
Commit: 15ae922ad4151701cbb4e0df207f43d0094366d1
Parents: dd8ef55
Author: kl0u <kk...@gmail.com>
Authored: Thu Feb 16 12:02:25 2017 +0100
Committer: kl0u <kk...@gmail.com>
Committed: Fri Feb 24 11:10:40 2017 +0100

----------------------------------------------------------------------
 .../flink/api/java/functions/KeySelector.java   |   2 +-
 .../api/java/functions/NullByteKeySelector.java |  39 ++++
 .../AbstractCEPBasePatternOperator.java         | 108 ------------
 .../operator/AbstractCEPPatternOperator.java    | 142 ---------------
 .../AbstractKeyedCEPPatternOperator.java        | 128 ++++++++++----
 .../flink/cep/operator/CEPOperatorUtils.java    |  23 ++-
 .../flink/cep/operator/CEPPatternOperator.java  |  76 --------
 .../cep/operator/KeyedCEPPatternOperator.java   |  22 +--
 .../cep/operator/TimeoutCEPPatternOperator.java |  94 ----------
 .../TimeoutKeyedCEPPatternOperator.java         |  43 ++---
 .../flink/cep/operator/CEPOperatorTest.java     | 176 -------------------
 .../api/datastream/AllWindowedStream.java       |  14 +-
 12 files changed, 186 insertions(+), 681 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/15ae922a/flink-core/src/main/java/org/apache/flink/api/java/functions/KeySelector.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/functions/KeySelector.java b/flink-core/src/main/java/org/apache/flink/api/java/functions/KeySelector.java
index d96f078..da3b3e2 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/functions/KeySelector.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/functions/KeySelector.java
@@ -34,7 +34,7 @@ import java.io.Serializable;
  */
 @Public
 public interface KeySelector<IN, KEY> extends Function, Serializable {
-	
+
 	/**
 	 * User-defined function that extracts the key from an arbitrary object.
 	 * 

http://git-wip-us.apache.org/repos/asf/flink/blob/15ae922a/flink-core/src/main/java/org/apache/flink/api/java/functions/NullByteKeySelector.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/functions/NullByteKeySelector.java b/flink-core/src/main/java/org/apache/flink/api/java/functions/NullByteKeySelector.java
new file mode 100644
index 0000000..4aa533d
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/java/functions/NullByteKeySelector.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.functions;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * Used as a dummy {@link KeySelector} to allow using keyed operators
+ * for non-keyed usecases. Essentially, it gives all incoming records
+ * the same key, which is a {@code (byte) 0} value.
+ *
+ * @param <T> The type of the input element.
+ */
+@Internal
+public class NullByteKeySelector<T> implements KeySelector<T, Byte> {
+
+	private static final long serialVersionUID = 614256539098549020L;
+
+	@Override
+	public Byte getKey(T value) throws Exception {
+		return 0;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/15ae922a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPBasePatternOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPBasePatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPBasePatternOperator.java
deleted file mode 100644
index a3497a6..0000000
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPBasePatternOperator.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.cep.operator;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.cep.nfa.NFA;
-import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator;
-import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-import java.io.IOException;
-import java.util.PriorityQueue;
-
-/**
- * Base class for CEP pattern operator. The operator uses a {@link NFA} to detect complex event
- * patterns. The detected event patterns are then outputted to the down stream operators.
- *
- * @param <IN> Type of the input elements
- * @param <OUT> Type fo the output elements
- */
-public abstract class AbstractCEPBasePatternOperator<IN, OUT>
-	extends AbstractStreamOperator<OUT>
-	implements OneInputStreamOperator<IN, OUT>, StreamCheckpointedOperator {
-
-	private static final long serialVersionUID = -4166778210774160757L;
-
-	protected static final int INITIAL_PRIORITY_QUEUE_CAPACITY = 11;
-
-	private final TypeSerializer<IN> inputSerializer;
-	private final boolean isProcessingTime;
-
-	public AbstractCEPBasePatternOperator(
-			final TypeSerializer<IN> inputSerializer,
-			final boolean isProcessingTime) {
-		this.inputSerializer = inputSerializer;
-		this.isProcessingTime = isProcessingTime;
-	}
-
-	public TypeSerializer<IN> getInputSerializer() {
-		return inputSerializer;
-	}
-
-	protected abstract NFA<IN> getNFA() throws IOException;
-
-	protected abstract void updateNFA(NFA<IN> nfa) throws IOException;
-
-	protected abstract PriorityQueue<StreamRecord<IN>> getPriorityQueue() throws IOException;
-
-	protected abstract void updatePriorityQueue(PriorityQueue<StreamRecord<IN>> queue) throws IOException;
-
-	@Override
-	public void processElement(StreamRecord<IN> element) throws Exception {
-		if (isProcessingTime) {
-			// there can be no out of order elements in processing time
-			NFA<IN> nfa = getNFA();
-			processEvent(nfa, element.getValue(), System.currentTimeMillis());
-			updateNFA(nfa);
-		} else {
-			PriorityQueue<StreamRecord<IN>> priorityQueue = getPriorityQueue();
-
-			// event time processing
-			// we have to buffer the elements until we receive the proper watermark
-			if (getExecutionConfig().isObjectReuseEnabled()) {
-				// copy the StreamRecord so that it cannot be changed
-				priorityQueue.offer(new StreamRecord<IN>(inputSerializer.copy(element.getValue()), element.getTimestamp()));
-			} else {
-				priorityQueue.offer(element);
-			}
-			updatePriorityQueue(priorityQueue);
-		}
-	}
-
-	/**
-	 * Process the given event by giving it to the NFA and outputting the produced set of matched
-	 * event sequences.
-	 *
-	 * @param nfa NFA to be used for the event detection
-	 * @param event The current event to be processed
-	 * @param timestamp The timestamp of the event
-	 */
-	protected abstract void processEvent(NFA<IN> nfa, IN event, long timestamp);
-
-	/**
-	 * Advances the time for the given NFA to the given timestamp. This can lead to pruning and
-	 * timeouts.
-	 *
-	 * @param nfa to advance the time for
-	 * @param timestamp to advance the time to
-	 */
-	protected abstract void advanceTime(NFA<IN> nfa, long timestamp);
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/15ae922a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java
deleted file mode 100644
index fe9aced..0000000
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.cep.operator;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.cep.nfa.NFA;
-import org.apache.flink.cep.nfa.compiler.NFACompiler;
-import org.apache.flink.core.fs.FSDataInputStream;
-import org.apache.flink.core.fs.FSDataOutputStream;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
-import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.util.PriorityQueue;
-
-/**
- * Abstract CEP pattern operator which is used for non keyed streams. Consequently,
- * the operator state only includes a single {@link NFA} and a priority queue to order out of order
- * elements in case of event time processing.
- *
- * @param <IN> Type of the input elements
- * @param <OUT> Type of the output elements
- */
-abstract public class AbstractCEPPatternOperator<IN, OUT> extends AbstractCEPBasePatternOperator<IN, OUT> {
-	private static final long serialVersionUID = 7487334510746595640L;
-
-	private final StreamElementSerializer<IN> streamRecordSerializer;
-
-	// global nfa for all elements
-	private NFA<IN> nfa;
-
-	// queue to buffer out of order stream records
-	private transient PriorityQueue<StreamRecord<IN>> priorityQueue;
-
-	public AbstractCEPPatternOperator(
-			TypeSerializer<IN> inputSerializer,
-			boolean isProcessingTime,
-			NFACompiler.NFAFactory<IN> nfaFactory) {
-		super(inputSerializer, isProcessingTime);
-
-		this.streamRecordSerializer = new StreamElementSerializer<>(inputSerializer);
-		this.nfa = nfaFactory.createNFA();
-	}
-
-	@Override
-	public void open() throws Exception {
-		super.open();
-		if (priorityQueue == null) {
-			priorityQueue = new PriorityQueue<StreamRecord<IN>>(INITIAL_PRIORITY_QUEUE_CAPACITY, new StreamRecordComparator<IN>());
-		}
-	}
-
-	@Override
-	protected NFA<IN> getNFA() throws IOException {
-		return nfa;
-	}
-
-	@Override
-	protected void updateNFA(NFA<IN> nfa) {
-		// a no-op, because we only have one NFA
-	}
-
-	@Override
-	protected PriorityQueue<StreamRecord<IN>> getPriorityQueue() throws IOException {
-		return priorityQueue;
-	}
-
-	@Override
-	protected void updatePriorityQueue(PriorityQueue<StreamRecord<IN>> queue) {
-		// a no-op, because we only have one priority queue
-	}
-
-	@Override
-	public void processWatermark(Watermark mark) throws Exception {
-		// we do our own watermark handling, no super call. we will never be able to use
-		// the timer service like this, however.
-
-		if (priorityQueue.isEmpty()) {
-			advanceTime(nfa, mark.getTimestamp());
-		} else {
-			while (!priorityQueue.isEmpty() && priorityQueue.peek().getTimestamp() <= mark.getTimestamp()) {
-				StreamRecord<IN> streamRecord = priorityQueue.poll();
-
-				processEvent(nfa, streamRecord.getValue(), streamRecord.getTimestamp());
-			}
-		}
-
-		output.emitWatermark(mark);
-	}
-
-	@Override
-	public void snapshotState(FSDataOutputStream out, long checkpointId, long timestamp) throws Exception {
-		final ObjectOutputStream oos = new ObjectOutputStream(out);
-
-		oos.writeObject(nfa);
-		oos.writeInt(priorityQueue.size());
-
-		for (StreamRecord<IN> streamRecord: priorityQueue) {
-			streamRecordSerializer.serialize(streamRecord, new DataOutputViewStreamWrapper(oos));
-		}
-		oos.flush();
-	}
-
-	@Override
-	@SuppressWarnings("unchecked")
-	public void restoreState(FSDataInputStream state) throws Exception {
-		final ObjectInputStream ois = new ObjectInputStream(state);
-
-		nfa = (NFA<IN>)ois.readObject();
-
-		int numberPriorityQueueEntries = ois.readInt();
-
-		priorityQueue = new PriorityQueue<StreamRecord<IN>>(numberPriorityQueueEntries, new StreamRecordComparator<IN>());
-
-		for (int i = 0; i <numberPriorityQueueEntries; i++) {
-			StreamElement streamElement = streamRecordSerializer.deserialize(new DataInputViewStreamWrapper(ois));
-			priorityQueue.offer(streamElement.<IN>asRecord());
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/15ae922a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
index 832a0ba..90ee846 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
@@ -30,9 +30,13 @@ import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Preconditions;
 
 import java.io.IOException;
 import java.io.Serializable;
@@ -44,7 +48,7 @@ import java.util.Set;
 /**
  * Abstract CEP pattern operator for a keyed input stream. For each key, the operator creates
  * a {@link NFA} and a priority queue to buffer out of order elements. Both data structures are
- * stored using the key value state. Additionally, the set of all seen keys is kept as part of the
+ * stored using the managed keyed state. Additionally, the set of all seen keys is kept as part of the
  * operator state. This is necessary to trigger the execution for all keys upon receiving a new
  * watermark.
  *
@@ -52,11 +56,17 @@ import java.util.Set;
  * @param <KEY> Type of the key on which the input stream is keyed
  * @param <OUT> Type of the output elements
  */
-abstract public class AbstractKeyedCEPPatternOperator<IN, KEY, OUT> extends AbstractCEPBasePatternOperator<IN, OUT> {
-	private static final long serialVersionUID = -7234999752950159178L;
+public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT>
+	extends AbstractStreamOperator<OUT>
+	implements OneInputStreamOperator<IN, OUT>, StreamCheckpointedOperator {
 
-	private static final String NFA_OPERATOR_STATE_NAME = "nfaOperatorState";
-	private static final String PRIORIRY_QUEUE_STATE_NAME = "priorityQueueStateName";
+	private static final long serialVersionUID = -4166778210774160757L;
+
+	private static final int INITIAL_PRIORITY_QUEUE_CAPACITY = 11;
+
+	private final boolean isProcessingTime;
+
+	private final TypeSerializer<IN> inputSerializer;
 
 	// necessary to extract the key from the input elements
 	private final KeySelector<IN, KEY> keySelector;
@@ -64,29 +74,38 @@ abstract public class AbstractKeyedCEPPatternOperator<IN, KEY, OUT> extends Abst
 	// necessary to serialize the set of seen keys
 	private final TypeSerializer<KEY> keySerializer;
 
-	private final PriorityQueueFactory<StreamRecord<IN>> priorityQueueFactory = new PriorityQueueStreamRecordFactory<>();
-	private final NFACompiler.NFAFactory<IN> nfaFactory;
+	///////////////			State			//////////////
 
 	// stores the keys we've already seen to trigger execution upon receiving a watermark
 	// this can be problematic, since it is never cleared
 	// TODO: fix once the state refactoring is completed
 	private transient Set<KEY> keys;
 
+	private static final String NFA_OPERATOR_STATE_NAME = "nfaOperatorState";
+	private static final String PRIORITY_QUEUE_STATE_NAME = "priorityQueueStateName";
+
 	private transient ValueState<NFA<IN>> nfaOperatorState;
 	private transient ValueState<PriorityQueue<StreamRecord<IN>>> priorityQueueOperatorState;
 
-	public AbstractKeyedCEPPatternOperator(
-			TypeSerializer<IN> inputSerializer,
-			boolean isProcessingTime,
-			KeySelector<IN, KEY> keySelector,
-			TypeSerializer<KEY> keySerializer,
-			NFACompiler.NFAFactory<IN> nfaFactory) {
-		super(inputSerializer, isProcessingTime);
+	private final PriorityQueueFactory<StreamRecord<IN>> priorityQueueFactory = new PriorityQueueStreamRecordFactory<>();
+	private final NFACompiler.NFAFactory<IN> nfaFactory;
 
-		this.keySelector = keySelector;
-		this.keySerializer = keySerializer;
+	public AbstractKeyedCEPPatternOperator(
+			final TypeSerializer<IN> inputSerializer,
+			final boolean isProcessingTime,
+			final KeySelector<IN, KEY> keySelector,
+			final TypeSerializer<KEY> keySerializer,
+			final NFACompiler.NFAFactory<IN> nfaFactory) {
+
+		this.inputSerializer = Preconditions.checkNotNull(inputSerializer);
+		this.isProcessingTime = Preconditions.checkNotNull(isProcessingTime);
+		this.keySelector = Preconditions.checkNotNull(keySelector);
+		this.keySerializer = Preconditions.checkNotNull(keySerializer);
+		this.nfaFactory = Preconditions.checkNotNull(nfaFactory);
+	}
 
-		this.nfaFactory = nfaFactory;
+	public TypeSerializer<IN> getInputSerializer() {
+		return inputSerializer;
 	}
 
 	@Override
@@ -100,27 +119,27 @@ abstract public class AbstractKeyedCEPPatternOperator<IN, KEY, OUT> extends Abst
 
 		if (nfaOperatorState == null) {
 			nfaOperatorState = getPartitionedState(
-					new ValueStateDescriptor<NFA<IN>>(
-						NFA_OPERATOR_STATE_NAME,
-						new NFA.Serializer<IN>()));
+				new ValueStateDescriptor<>(NFA_OPERATOR_STATE_NAME, new NFA.Serializer<IN>()));
 		}
 
 		@SuppressWarnings("unchecked,rawtypes")
 		TypeSerializer<StreamRecord<IN>> streamRecordSerializer =
-				(TypeSerializer) new StreamElementSerializer<>(getInputSerializer());
+			(TypeSerializer) new StreamElementSerializer<>(getInputSerializer());
 
 		if (priorityQueueOperatorState == null) {
 			priorityQueueOperatorState = getPartitionedState(
-					new ValueStateDescriptor<>(
-						PRIORIRY_QUEUE_STATE_NAME,
-						new PriorityQueueSerializer<>(
-								streamRecordSerializer,
-								new PriorityQueueStreamRecordFactory<IN>())));
+				new ValueStateDescriptor<>(
+					PRIORITY_QUEUE_STATE_NAME,
+					new PriorityQueueSerializer<>(
+						streamRecordSerializer,
+						new PriorityQueueStreamRecordFactory<IN>()
+					)
+				)
+			);
 		}
 	}
 
-	@Override
-	protected NFA<IN> getNFA() throws IOException {
+	private NFA<IN> getNFA() throws IOException {
 		NFA<IN> nfa = nfaOperatorState.value();
 
 		if (nfa == null) {
@@ -132,13 +151,11 @@ abstract public class AbstractKeyedCEPPatternOperator<IN, KEY, OUT> extends Abst
 		return nfa;
 	}
 
-	@Override
-	protected void updateNFA(NFA<IN> nfa) throws IOException {
+	private void updateNFA(NFA<IN> nfa) throws IOException {
 		nfaOperatorState.update(nfa);
 	}
 
-	@Override
-	protected PriorityQueue<StreamRecord<IN>> getPriorityQueue() throws IOException {
+	private PriorityQueue<StreamRecord<IN>> getPriorityQueue() throws IOException {
 		PriorityQueue<StreamRecord<IN>> priorityQueue = priorityQueueOperatorState.value();
 
 		if (priorityQueue == null) {
@@ -150,8 +167,7 @@ abstract public class AbstractKeyedCEPPatternOperator<IN, KEY, OUT> extends Abst
 		return priorityQueue;
 	}
 
-	@Override
-	protected void updatePriorityQueue(PriorityQueue<StreamRecord<IN>> queue) throws IOException {
+	private void updatePriorityQueue(PriorityQueue<StreamRecord<IN>> queue) throws IOException {
 		priorityQueueOperatorState.update(queue);
 	}
 
@@ -159,7 +175,24 @@ abstract public class AbstractKeyedCEPPatternOperator<IN, KEY, OUT> extends Abst
 	public void processElement(StreamRecord<IN> element) throws Exception {
 		keys.add(keySelector.getKey(element.getValue()));
 
-		super.processElement(element);
+		if (isProcessingTime) {
+			// there can be no out of order elements in processing time
+			NFA<IN> nfa = getNFA();
+			processEvent(nfa, element.getValue(), System.currentTimeMillis());
+			updateNFA(nfa);
+		} else {
+			PriorityQueue<StreamRecord<IN>> priorityQueue = getPriorityQueue();
+
+			// event time processing
+			// we have to buffer the elements until we receive the proper watermark
+			if (getExecutionConfig().isObjectReuseEnabled()) {
+				// copy the StreamRecord so that it cannot be changed
+				priorityQueue.offer(new StreamRecord<IN>(inputSerializer.copy(element.getValue()), element.getTimestamp()));
+			} else {
+				priorityQueue.offer(element);
+			}
+			updatePriorityQueue(priorityQueue);
+		}
 	}
 
 	@Override
@@ -175,7 +208,7 @@ abstract public class AbstractKeyedCEPPatternOperator<IN, KEY, OUT> extends Abst
 			NFA<IN> nfa = getNFA();
 
 			if (priorityQueue.isEmpty()) {
-					advanceTime(nfa, mark.getTimestamp());
+				advanceTime(nfa, mark.getTimestamp());
 			} else {
 				while (!priorityQueue.isEmpty() && priorityQueue.peek().getTimestamp() <= mark.getTimestamp()) {
 					StreamRecord<IN> streamRecord = priorityQueue.poll();
@@ -191,6 +224,25 @@ abstract public class AbstractKeyedCEPPatternOperator<IN, KEY, OUT> extends Abst
 		output.emitWatermark(mark);
 	}
 
+	/**
+	 * Process the given event by giving it to the NFA and outputting the produced set of matched
+	 * event sequences.
+	 *
+	 * @param nfa NFA to be used for the event detection
+	 * @param event The current event to be processed
+	 * @param timestamp The timestamp of the event
+	 */
+	protected abstract void processEvent(NFA<IN> nfa, IN event, long timestamp);
+
+	/**
+	 * Advances the time for the given NFA to the given timestamp. This can lead to pruning and
+	 * timeouts.
+	 *
+	 * @param nfa to advance the time for
+	 * @param timestamp to advance the time to
+	 */
+	protected abstract void advanceTime(NFA<IN> nfa, long timestamp);
+
 	@Override
 	public void snapshotState(FSDataOutputStream out, long checkpointId, long timestamp) throws Exception {
 		DataOutputView ov = new DataOutputViewStreamWrapper(out);
@@ -216,6 +268,8 @@ abstract public class AbstractKeyedCEPPatternOperator<IN, KEY, OUT> extends Abst
 		}
 	}
 
+	//////////////////////			Utility Classes			//////////////////////
+
 	/**
 	 * Custom type serializer implementation to serialize priority queues.
 	 *
@@ -228,7 +282,7 @@ abstract public class AbstractKeyedCEPPatternOperator<IN, KEY, OUT> extends Abst
 		private final TypeSerializer<T> elementSerializer;
 		private final PriorityQueueFactory<T> factory;
 
-		public PriorityQueueSerializer(final TypeSerializer<T> elementSerializer, final PriorityQueueFactory<T> factory) {
+		PriorityQueueSerializer(final TypeSerializer<T> elementSerializer, final PriorityQueueFactory<T> factory) {
 			this.elementSerializer = elementSerializer;
 			this.factory = factory;
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/15ae922a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java
index 39e2ccd..36f2e7a 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java
@@ -21,7 +21,9 @@ package org.apache.flink.cep.operator;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.ByteSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.functions.NullByteKeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.EitherTypeInfo;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
@@ -72,12 +74,18 @@ public class CEPOperatorUtils {
 					keySerializer,
 					nfaFactory));
 		} else {
-			patternStream = inputStream.transform(
+
+			KeySelector<T, Byte> keySelector = new NullByteKeySelector<>();
+			TypeSerializer<Byte> keySerializer = ByteSerializer.INSTANCE;
+
+			patternStream = inputStream.keyBy(keySelector).transform(
 				"CEPPatternOperator",
 				(TypeInformation<Map<String, T>>) (TypeInformation<?>) TypeExtractor.getForClass(Map.class),
-				new CEPPatternOperator<>(
+				new KeyedCEPPatternOperator<>(
 					inputSerializer,
 					isProcessingTime,
+					keySelector,
+					keySerializer,
 					nfaFactory
 				)).forceNonParallel();
 		}
@@ -127,12 +135,19 @@ public class CEPOperatorUtils {
 					keySerializer,
 					nfaFactory));
 		} else {
-			patternStream = inputStream.transform(
+
+			KeySelector<T, Byte> keySelector = new NullByteKeySelector<>();
+
+			TypeSerializer<Byte> keySerializer = ByteSerializer.INSTANCE;
+
+			patternStream = inputStream.keyBy(keySelector).transform(
 				"TimeoutCEPPatternOperator",
 				eitherTypeInformation,
-				new TimeoutCEPPatternOperator<>(
+				new TimeoutKeyedCEPPatternOperator<>(
 					inputSerializer,
 					isProcessingTime,
+					keySelector,
+					keySerializer,
 					nfaFactory
 				)).forceNonParallel();
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/15ae922a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPPatternOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPPatternOperator.java
deleted file mode 100644
index 57f27c2..0000000
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPPatternOperator.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.cep.operator;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.cep.nfa.NFA;
-import org.apache.flink.cep.nfa.compiler.NFACompiler;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.Map;
-
-/**
- * CEP pattern operator which only returns fully matched event patterns stored in a {@link Map}. The
- * events are indexed by the event names associated in the pattern specification.
- *
- * @param <IN> Type of the input events
- */
-public class CEPPatternOperator<IN> extends AbstractCEPPatternOperator<IN, Map<String, IN>> {
-	private static final long serialVersionUID = 376300194236250645L;
-
-	public CEPPatternOperator(TypeSerializer<IN> inputSerializer, boolean isProcessingTime, NFACompiler.NFAFactory<IN> nfaFactory) {
-		super(inputSerializer, isProcessingTime, nfaFactory);
-	}
-
-	@Override
-	protected void processEvent(NFA<IN> nfa, IN event, long timestamp) {
-		Tuple2<Collection<Map<String, IN>>, Collection<Tuple2<Map<String, IN>, Long>>> patterns = nfa.process(
-			event,
-			timestamp);
-
-		Collection<Map<String, IN>> matchedPatterns = patterns.f0;
-
-		emitMatchedSequences(matchedPatterns, timestamp);
-	}
-
-	@Override
-	protected void advanceTime(NFA<IN> nfa, long timestamp) {
-		Tuple2<Collection<Map<String, IN>>, Collection<Tuple2<Map<String, IN>, Long>>> patterns = nfa.process(null, timestamp);
-
-		emitMatchedSequences(patterns.f0, timestamp);
-	}
-
-	private void emitMatchedSequences(Iterable<Map<String, IN>> matchedSequences, long timestamp) {
-		Iterator<Map<String, IN>> iterator = matchedSequences.iterator();
-
-		if (iterator.hasNext()) {
-			StreamRecord<Map<String, IN>> streamRecord = new StreamRecord<Map<String, IN>>(
-				null,
-				timestamp);
-
-			do {
-				streamRecord.replace(iterator.next());
-				output.collect(streamRecord);
-			} while (iterator.hasNext());
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/15ae922a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java
index 4d8a907..5b6ffe2 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java
@@ -40,25 +40,27 @@ import java.util.Map;
 public class KeyedCEPPatternOperator<IN, KEY> extends AbstractKeyedCEPPatternOperator<IN, KEY, Map<String, IN>> {
 	private static final long serialVersionUID = 5328573789532074581L;
 
-	public KeyedCEPPatternOperator(TypeSerializer<IN> inputSerializer, boolean isProcessingTime, KeySelector<IN, KEY> keySelector, TypeSerializer<KEY> keySerializer, NFACompiler.NFAFactory<IN> nfaFactory) {
+	public KeyedCEPPatternOperator(
+			TypeSerializer<IN> inputSerializer,
+			boolean isProcessingTime,
+			KeySelector<IN, KEY> keySelector,
+			TypeSerializer<KEY> keySerializer,
+			NFACompiler.NFAFactory<IN> nfaFactory) {
+
 		super(inputSerializer, isProcessingTime, keySelector, keySerializer, nfaFactory);
 	}
 
 	@Override
 	protected void processEvent(NFA<IN> nfa, IN event, long timestamp) {
-		Tuple2<Collection<Map<String, IN>>, Collection<Tuple2<Map<String, IN>, Long>>> patterns = nfa.process(
-			event,
-			timestamp);
-
-		Collection<Map<String, IN>> matchedPatterns = patterns.f0;
-
-		emitMatchedSequences(matchedPatterns, timestamp);
+		Tuple2<Collection<Map<String, IN>>, Collection<Tuple2<Map<String, IN>, Long>>> patterns =
+			nfa.process(event, timestamp);
+		emitMatchedSequences(patterns.f0, timestamp);
 	}
 
 	@Override
 	protected void advanceTime(NFA<IN> nfa, long timestamp) {
-		Tuple2<Collection<Map<String, IN>>, Collection<Tuple2<Map<String, IN>, Long>>> patterns = nfa.process(null, timestamp);
-
+		Tuple2<Collection<Map<String, IN>>, Collection<Tuple2<Map<String, IN>, Long>>> patterns =
+			nfa.process(null, timestamp);
 		emitMatchedSequences(patterns.f0, timestamp);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/15ae922a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutCEPPatternOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutCEPPatternOperator.java
deleted file mode 100644
index 9a04468..0000000
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutCEPPatternOperator.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.cep.operator;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.cep.nfa.NFA;
-import org.apache.flink.cep.nfa.compiler.NFACompiler;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.types.Either;
-
-import java.util.Collection;
-import java.util.Map;
-
-/**
- * CEP pattern operator which only returns fully matched event patterns and partially matched event
- * patterns which have timed out wrapped in {@link Either}. The matched events are stored in a
- * {@link Map} and are indexed by the event names associated in the pattern specification.
- *
- * The fully matched event patterns are returned as a {@link Either.Right} instance and the
- * partially matched event patterns are returned as a {@link Either.Left} instance.
- *
- * @param <IN> Type of the input events
- */
-public class TimeoutCEPPatternOperator<IN> extends AbstractCEPPatternOperator<IN, Either<Tuple2<Map<String, IN>, Long>, Map<String, IN>>> {
-	private static final long serialVersionUID = -3911002597290988201L;
-
-	public TimeoutCEPPatternOperator(
-		TypeSerializer<IN> inputSerializer,
-		boolean isProcessingTime,
-		NFACompiler.NFAFactory<IN> nfaFactory) {
-
-		super(inputSerializer, isProcessingTime, nfaFactory);
-	}
-
-	@Override
-	protected void processEvent(NFA<IN> nfa, IN event, long timestamp) {
-		Tuple2<Collection<Map<String, IN>>, Collection<Tuple2<Map<String, IN>, Long>>> patterns = nfa.process(
-			event,
-			timestamp);
-
-		Collection<Map<String, IN>> matchedPatterns = patterns.f0;
-		Collection<Tuple2<Map<String, IN>, Long>> partialPatterns = patterns.f1;
-
-		emitMatchedSequences(matchedPatterns, timestamp);
-		emitTimedOutSequences(partialPatterns, timestamp);
-	}
-
-	@Override
-	protected void advanceTime(NFA<IN> nfa, long timestamp) {
-		Tuple2<Collection<Map<String, IN>>, Collection<Tuple2<Map<String, IN>, Long>>> patterns = nfa.process(null, timestamp);
-
-		emitMatchedSequences(patterns.f0, timestamp);
-		emitTimedOutSequences(patterns.f1, timestamp);
-	}
-
-	private void emitTimedOutSequences(Iterable<Tuple2<Map<String, IN>, Long>> timedOutSequences, long timestamp) {
-		StreamRecord<Either<Tuple2<Map<String, IN>, Long>, Map<String, IN>>> streamRecord = new StreamRecord<Either<Tuple2<Map<String, IN>, Long>, Map<String, IN>>>(
-			null,
-			timestamp);
-
-		for (Tuple2<Map<String, IN>, Long> partialPattern: timedOutSequences) {
-			streamRecord.replace(Either.Left(partialPattern));
-			output.collect(streamRecord);
-		}
-	}
-
-	protected void emitMatchedSequences(Iterable<Map<String, IN>> matchedSequences, long timestamp) {
-		StreamRecord<Either<Tuple2<Map<String, IN>, Long>, Map<String, IN>>> streamRecord = new StreamRecord<Either<Tuple2<Map<String, IN>, Long>, Map<String, IN>>>(
-			null,
-			timestamp);
-
-		for (Map<String, IN> matchedPattern : matchedSequences) {
-			streamRecord.replace(Either.Right(matchedPattern));
-			output.collect(streamRecord);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/15ae922a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java
index 4d33435..6889bb9 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java
@@ -29,44 +29,48 @@ import org.apache.flink.types.Either;
 import java.util.Collection;
 import java.util.Map;
 
+/**
+ * CEP pattern operator which returns fully and partially matched (timed-out) event patterns stored in a
+ * {@link Map}. The events are indexed by the event names associated in the pattern specification. The
+ * operator works on keyed input data.
+ *
+ * @param <IN> Type of the input events
+ * @param <KEY> Type of the key
+ */
 public class TimeoutKeyedCEPPatternOperator<IN, KEY> extends AbstractKeyedCEPPatternOperator<IN, KEY, Either<Tuple2<Map<String, IN>, Long>, Map<String, IN>>> {
 	private static final long serialVersionUID = 3570542177814518158L;
 
 	public TimeoutKeyedCEPPatternOperator(
-		TypeSerializer<IN> inputSerializer,
-		boolean isProcessingTime,
-		KeySelector<IN, KEY> keySelector,
-		TypeSerializer<KEY> keySerializer,
-		NFACompiler.NFAFactory<IN> nfaFactory) {
+			TypeSerializer<IN> inputSerializer,
+			boolean isProcessingTime,
+			KeySelector<IN, KEY> keySelector,
+			TypeSerializer<KEY> keySerializer,
+			NFACompiler.NFAFactory<IN> nfaFactory) {
 
 		super(inputSerializer, isProcessingTime, keySelector, keySerializer, nfaFactory);
 	}
 
 	@Override
 	protected void processEvent(NFA<IN> nfa, IN event, long timestamp) {
-		Tuple2<Collection<Map<String, IN>>, Collection<Tuple2<Map<String, IN>, Long>>> patterns = nfa.process(
-			event,
-			timestamp);
-
-		Collection<Map<String, IN>> matchedSequences = patterns.f0;
-		Collection<Tuple2<Map<String, IN>, Long>> timedOutSequences = patterns.f1;
+		Tuple2<Collection<Map<String, IN>>, Collection<Tuple2<Map<String, IN>, Long>>> patterns =
+			nfa.process(event, timestamp);
 
-		emitMatchedSequences(matchedSequences, timestamp);
-		emitTimedOutSequences(timedOutSequences, timestamp);
+		emitMatchedSequences(patterns.f0, timestamp);
+		emitTimedOutSequences(patterns.f1, timestamp);
 	}
 
 	@Override
 	protected void advanceTime(NFA<IN> nfa, long timestamp) {
-		Tuple2<Collection<Map<String, IN>>, Collection<Tuple2<Map<String, IN>, Long>>> patterns = nfa.process(null, timestamp);
+		Tuple2<Collection<Map<String, IN>>, Collection<Tuple2<Map<String, IN>, Long>>> patterns =
+			nfa.process(null, timestamp);
 
 		emitMatchedSequences(patterns.f0, timestamp);
 		emitTimedOutSequences(patterns.f1, timestamp);
 	}
 
 	private void emitTimedOutSequences(Iterable<Tuple2<Map<String, IN>, Long>> timedOutSequences, long timestamp) {
-		StreamRecord<Either<Tuple2<Map<String, IN>, Long>, Map<String, IN>>> streamRecord = new StreamRecord<Either<Tuple2<Map<String, IN>, Long>, Map<String, IN>>>(
-			null,
-			timestamp);
+		StreamRecord<Either<Tuple2<Map<String, IN>, Long>, Map<String, IN>>> streamRecord =
+			new StreamRecord<Either<Tuple2<Map<String, IN>, Long>, Map<String, IN>>>(null, timestamp);
 
 		for (Tuple2<Map<String, IN>, Long> partialPattern: timedOutSequences) {
 			streamRecord.replace(Either.Left(partialPattern));
@@ -75,9 +79,8 @@ public class TimeoutKeyedCEPPatternOperator<IN, KEY> extends AbstractKeyedCEPPat
 	}
 
 	protected void emitMatchedSequences(Iterable<Map<String, IN>> matchedSequences, long timestamp) {
-		StreamRecord<Either<Tuple2<Map<String, IN>, Long>, Map<String, IN>>> streamRecord = new StreamRecord<Either<Tuple2<Map<String, IN>, Long>, Map<String, IN>>>(
-			null,
-			timestamp);
+		StreamRecord<Either<Tuple2<Map<String, IN>, Long>, Map<String, IN>>> streamRecord =
+			new StreamRecord<Either<Tuple2<Map<String, IN>, Long>, Map<String, IN>>>(null, timestamp);
 
 		for (Map<String, IN> matchedPattern : matchedSequences) {
 			streamRecord.replace(Either.Right(matchedPattern));

http://git-wip-us.apache.org/repos/asf/flink/blob/15ae922a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
index db17f6d..f90b670 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
@@ -57,29 +57,6 @@ public class CEPOperatorTest extends TestLogger {
 	public TemporaryFolder tempFolder = new TemporaryFolder();
 
 	@Test
-	public void testCEPOperatorWatermarkForwarding() throws Exception {
-		OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness = new OneInputStreamOperatorTestHarness<>(
-			new CEPPatternOperator<>(
-				Event.createTypeSerializer(),
-				false,
-				new NFAFactory())
-		);
-
-		harness.open();
-
-		Watermark expectedWatermark = new Watermark(42L);
-
-		harness.processWatermark(expectedWatermark);
-
-		Object watermark = harness.getOutput().poll();
-
-		assertTrue(watermark instanceof Watermark);
-		assertEquals(expectedWatermark, watermark);
-
-		harness.close();
-	}
-
-	@Test
 	public void testKeyedCEPOperatorWatermarkForwarding() throws Exception {
 		KeySelector<Event, Integer> keySelector = new KeySelector<Event, Integer>() {
 			private static final long serialVersionUID = -4873366487571254798L;
@@ -115,94 +92,6 @@ public class CEPOperatorTest extends TestLogger {
 	}
 
 	@Test
-	public void testCEPOperatorCheckpointing() throws Exception {
-		KeySelector<Event, Integer> keySelector = new KeySelector<Event, Integer>() {
-			private static final long serialVersionUID = -4873366487571254798L;
-
-			@Override
-			public Integer getKey(Event value) throws Exception {
-				return value.getId();
-			}
-		};
-
-		OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness = new OneInputStreamOperatorTestHarness<>(
-				new CEPPatternOperator<>(
-						Event.createTypeSerializer(),
-						false,
-						new NFAFactory()));
-
-		harness.open();
-
-		Event startEvent = new Event(42, "start", 1.0);
-		SubEvent middleEvent = new SubEvent(42, "foo", 1.0, 10.0);
-		Event endEvent=  new Event(42, "end", 1.0);
-
-		harness.processElement(new StreamRecord<Event>(startEvent, 1));
-		harness.processElement(new StreamRecord<Event>(new Event(42, "foobar", 1.0), 2));
-
-		// simulate snapshot/restore with some elements in internal sorting queue
-		StreamStateHandle snapshot = harness.snapshotLegacy(0, 0);
-		harness.close();
-
-		harness = new OneInputStreamOperatorTestHarness<>(
-				new CEPPatternOperator<>(
-						Event.createTypeSerializer(),
-						false,
-						new NFAFactory()));
-
-		harness.setup();
-		harness.restore(snapshot);
-		harness.open();
-
-		harness.processWatermark(new Watermark(Long.MIN_VALUE));
-
-		harness.processElement(new StreamRecord<Event>(new SubEvent(42, "barfoo", 1.0, 5.0), 3));
-
-		// if element timestamps are not correctly checkpointed/restored this will lead to
-		// a pruning time underflow exception in NFA
-		harness.processWatermark(new Watermark(2));
-
-		// simulate snapshot/restore with empty element queue but NFA state
-		StreamStateHandle snapshot2 = harness.snapshotLegacy(1, 1);
-		harness.close();
-
-		harness = new OneInputStreamOperatorTestHarness<>(
-				new CEPPatternOperator<>(
-						Event.createTypeSerializer(),
-						false,
-						new NFAFactory()));
-
-		harness.setup();
-		harness.restore(snapshot2);
-		harness.open();
-
-		harness.processElement(new StreamRecord<Event>(middleEvent, 3));
-		harness.processElement(new StreamRecord<Event>(new Event(42, "start", 1.0), 4));
-		harness.processElement(new StreamRecord<Event>(endEvent, 5));
-
-		harness.processWatermark(new Watermark(Long.MAX_VALUE));
-
-		ConcurrentLinkedQueue<Object> result = harness.getOutput();
-
-		// watermark and the result
-		assertEquals(2, result.size());
-
-		Object resultObject = result.poll();
-		assertTrue(resultObject instanceof StreamRecord);
-		StreamRecord<?> resultRecord = (StreamRecord<?>) resultObject;
-		assertTrue(resultRecord.getValue() instanceof Map);
-
-		@SuppressWarnings("unchecked")
-		Map<String, Event> patternMap = (Map<String, Event>) resultRecord.getValue();
-
-		assertEquals(startEvent, patternMap.get("start"));
-		assertEquals(middleEvent, patternMap.get("middle"));
-		assertEquals(endEvent, patternMap.get("end"));
-
-		harness.close();
-	}
-
-	@Test
 	public void testKeyedCEPOperatorCheckpointing() throws Exception {
 
 		KeySelector<Event, Integer> keySelector = new KeySelector<Event, Integer>() {
@@ -493,71 +382,6 @@ public class CEPOperatorTest extends TestLogger {
 		}
 	}
 
-	/**
-	 * Tests that the internal time of a CEP operator advances only given watermarks. See FLINK-5033
-	 */
-	@Test
-	public void testAdvancingTimeWithoutElements() throws Exception {
-		final Event startEvent = new Event(42, "start", 1.0);
-		final long watermarkTimestamp1 = 5L;
-		final long watermarkTimestamp2 = 13L;
-
-		final Map<String, Event> expectedSequence = new HashMap<>(2);
-		expectedSequence.put("start", startEvent);
-
-		OneInputStreamOperatorTestHarness<Event, Either<Tuple2<Map<String, Event>, Long>, Map<String, Event>>> harness = new OneInputStreamOperatorTestHarness<>(
-			new TimeoutCEPPatternOperator<>(
-				Event.createTypeSerializer(),
-				false,
-				new NFAFactory(true))
-		);
-
-		try {
-			harness.setup(
-				new KryoSerializer<>(
-					(Class<Either<Tuple2<Map<String, Event>, Long>, Map<String, Event>>>) (Object) Either.class,
-					new ExecutionConfig()));
-			harness.open();
-
-			harness.processElement(new StreamRecord<>(startEvent, 3L));
-			harness.processWatermark(new Watermark(watermarkTimestamp1));
-			harness.processWatermark(new Watermark(watermarkTimestamp2));
-
-			Queue<Object> result = harness.getOutput();
-
-			assertEquals(3, result.size());
-
-			Object watermark1 = result.poll();
-
-			assertTrue(watermark1 instanceof Watermark);
-
-			assertEquals(watermarkTimestamp1, ((Watermark) watermark1).getTimestamp());
-
-			Object resultObject = result.poll();
-
-			assertTrue(resultObject instanceof StreamRecord);
-
-			StreamRecord<Either<Tuple2<Map<String, Event>, Long>, Map<String, Event>>> streamRecord = (StreamRecord<Either<Tuple2<Map<String,Event>,Long>,Map<String,Event>>>) resultObject;
-
-			assertTrue(streamRecord.getValue() instanceof Either.Left);
-
-			Either.Left<Tuple2<Map<String, Event>, Long>, Map<String, Event>> left = (Either.Left<Tuple2<Map<String, Event>, Long>, Map<String, Event>>) streamRecord.getValue();
-
-			Tuple2<Map<String, Event>, Long> leftResult = left.left();
-
-			assertEquals(watermarkTimestamp2, (long) leftResult.f1);
-			assertEquals(expectedSequence, leftResult.f0);
-
-			Object watermark2 = result.poll();
-
-			assertTrue(watermark2 instanceof Watermark);
-
-			assertEquals(watermarkTimestamp2, ((Watermark) watermark2).getTimestamp());
-		} finally {
-			harness.close();
-		}
-	}
-
 	private static class NFAFactory implements NFACompiler.NFAFactory<Event> {
 
 		private static final long serialVersionUID = 1173020762472766713L;

http://git-wip-us.apache.org/repos/asf/flink/blob/15ae922a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
index 4f4546e..f883ef5 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
@@ -32,6 +32,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.Utils;
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.functions.NullByteKeySelector;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction;
@@ -1124,17 +1125,4 @@ public class AllWindowedStream<T, W extends Window> {
 	public TypeInformation<T> getInputType() {
 		return input.getType();
 	}
-
-	/**
-	 * Used as dummy KeySelector to allow using WindowOperator for Non-Keyed Windows.
-	 * @param <T>
-	 */
-	private static class NullByteKeySelector<T> implements KeySelector<T, Byte> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public Byte getKey(T value) throws Exception {
-			return 0;
-		}
-	}
 }