You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/06/16 14:49:28 UTC

[1/2] flink git commit: [FLINK-3317] [cep] Introduce timeout handler to CEP operator

Repository: flink
Updated Branches:
  refs/heads/master c78b3c49e -> 57ef6c315


http://git-wip-us.apache.org/repos/asf/flink/blob/57ef6c31/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
new file mode 100644
index 0000000..373b9f4
--- /dev/null
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.operator;
+
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
+import org.apache.flink.cep.nfa.NFA;
+import org.apache.flink.cep.nfa.compiler.NFACompiler;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
+import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.PriorityQueue;
+import java.util.Set;
+
+/**
+ * Abstract CEP pattern operator for a keyed input stream. For each key, the operator creates
+ * a {@link NFA} and a priority queue to buffer out of order elements. Both data structures are
+ * stored using the key value state. Additionally, the set of all seen keys is kept as part of the
+ * operator state. This is necessary to trigger the execution for all keys upon receiving a new
+ * watermark.
+ *
+ * @param <IN> Type of the input elements
+ * @param <KEY> Type of the key on which the input stream is keyed
+ * @param <OUT> Type of the output elements
+ */
+abstract public class AbstractKeyedCEPPatternOperator<IN, KEY, OUT> extends AbstractCEPBasePatternOperator<IN, OUT> {
+	private static final long serialVersionUID = -7234999752950159178L;
+
+	private static final String NFA_OPERATOR_STATE_NAME = "nfaOperatorState";
+	private static final String PRIORIRY_QUEUE_STATE_NAME = "priorityQueueStateName";
+
+	// necessary to extract the key from the input elements
+	private final KeySelector<IN, KEY> keySelector;
+
+	// necessary to serialize the set of seen keys
+	private final TypeSerializer<KEY> keySerializer;
+
+	private final PriorityQueueFactory<StreamRecord<IN>> priorityQueueFactory = new PriorityQueueStreamRecordFactory<>();
+	private final NFACompiler.NFAFactory<IN> nfaFactory;
+
+	// stores the keys we've already seen to trigger execution upon receiving a watermark
+	// this can be problematic, since it is never cleared
+	// TODO: fix once the state refactoring is completed
+	private transient Set<KEY> keys;
+
+	private transient ValueState<NFA<IN>> nfaOperatorState;
+	private transient ValueState<PriorityQueue<StreamRecord<IN>>> priorityQueueOperatorState;
+
+	public AbstractKeyedCEPPatternOperator(
+			TypeSerializer<IN> inputSerializer,
+			boolean isProcessingTime,
+			KeySelector<IN, KEY> keySelector,
+			TypeSerializer<KEY> keySerializer,
+			NFACompiler.NFAFactory<IN> nfaFactory) {
+		super(inputSerializer, isProcessingTime);
+
+		this.keySelector = keySelector;
+		this.keySerializer = keySerializer;
+
+		this.nfaFactory = nfaFactory;
+	}
+
+	@Override
+	@SuppressWarnings("unchecked")
+	public void open() throws Exception {
+		if (keys == null) {
+			keys = new HashSet<>();
+		}
+
+		if (nfaOperatorState == null) {
+			nfaOperatorState = getPartitionedState(
+					new ValueStateDescriptor<NFA<IN>>(
+						NFA_OPERATOR_STATE_NAME,
+						new KryoSerializer<NFA<IN>>((Class<NFA<IN>>) (Class<?>) NFA.class, getExecutionConfig()),
+						null));
+		}
+
+		if (priorityQueueOperatorState == null) {
+			priorityQueueOperatorState = getPartitionedState(
+					new ValueStateDescriptor<PriorityQueue<StreamRecord<IN>>>(
+						PRIORIRY_QUEUE_STATE_NAME,
+						new PriorityQueueSerializer<StreamRecord<IN>>(
+							new StreamRecordSerializer<IN>(getInputSerializer()),
+							new PriorityQueueStreamRecordFactory<IN>()),
+						null));
+		}
+	}
+
+	@Override
+	protected NFA<IN> getNFA() throws IOException {
+		NFA<IN> nfa = nfaOperatorState.value();
+
+		if (nfa == null) {
+			nfa = nfaFactory.createNFA();
+
+			nfaOperatorState.update(nfa);
+		}
+
+		return nfa;
+	}
+
+	@Override
+	protected PriorityQueue<StreamRecord<IN>> getPriorityQueue() throws IOException {
+		PriorityQueue<StreamRecord<IN>> priorityQueue = priorityQueueOperatorState.value();
+
+		if (priorityQueue == null) {
+			priorityQueue = priorityQueueFactory.createPriorityQueue();
+
+			priorityQueueOperatorState.update(priorityQueue);
+		}
+
+		return priorityQueue;
+	}
+
+	@Override
+	public void processElement(StreamRecord<IN> element) throws Exception {
+		keys.add(keySelector.getKey(element.getValue()));
+
+		super.processElement(element);
+	}
+
+	@Override
+	public void processWatermark(Watermark mark) throws Exception {
+		// iterate over all keys to trigger the execution of the buffered elements
+		for (KEY key: keys) {
+			setKeyContext(key);
+
+			PriorityQueue<StreamRecord<IN>> priorityQueue = getPriorityQueue();
+
+			NFA<IN> nfa = getNFA();
+
+			while (!priorityQueue.isEmpty() && priorityQueue.peek().getTimestamp() <= mark.getTimestamp()) {
+				StreamRecord<IN> streamRecord = priorityQueue.poll();
+
+				processEvent(nfa, streamRecord.getValue(), streamRecord.getTimestamp());
+			}
+		}
+
+		output.emitWatermark(mark);
+	}
+
+	@Override
+	public StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception {
+		StreamTaskState taskState = super.snapshotOperatorState(checkpointId, timestamp);
+
+		AbstractStateBackend.CheckpointStateOutputView ov = getStateBackend().createCheckpointStateOutputView(checkpointId, timestamp);
+
+		ov.writeInt(keys.size());
+
+		for (KEY key: keys) {
+			keySerializer.serialize(key, ov);
+		}
+
+		taskState.setOperatorState(ov.closeAndGetHandle());
+
+		return taskState;
+	}
+
+	@Override
+	public void restoreState(StreamTaskState state, long recoveryTimestamp) throws Exception {
+		super.restoreState(state, recoveryTimestamp);
+
+		@SuppressWarnings("unchecked")
+		StateHandle<DataInputView> stateHandle = (StateHandle<DataInputView>) state;
+
+		DataInputView inputView = stateHandle.getState(getUserCodeClassloader());
+
+		if (keys == null) {
+			keys = new HashSet<>();
+		}
+
+		int numberEntries = inputView.readInt();
+
+		for (int i = 0; i <numberEntries; i++) {
+			keys.add(keySerializer.deserialize(inputView));
+		}
+	}
+
+	/**
+	 * Custom type serializer implementation to serialize priority queues.
+	 *
+	 * @param <T> Type of the priority queue's elements
+	 */
+	private static class PriorityQueueSerializer<T> extends TypeSerializer<PriorityQueue<T>> {
+
+		private static final long serialVersionUID = -231980397616187715L;
+
+		private final TypeSerializer<T> elementSerializer;
+		private final PriorityQueueFactory<T> factory;
+
+		public PriorityQueueSerializer(final TypeSerializer<T> elementSerializer, final PriorityQueueFactory<T> factory) {
+			this.elementSerializer = elementSerializer;
+			this.factory = factory;
+		}
+
+		@Override
+		public boolean isImmutableType() {
+			return false;
+		}
+
+		@Override
+		public TypeSerializer<PriorityQueue<T>> duplicate() {
+			return new PriorityQueueSerializer<>(elementSerializer.duplicate(), factory);
+		}
+
+		@Override
+		public PriorityQueue<T> createInstance() {
+			return factory.createPriorityQueue();
+		}
+
+		@Override
+		public PriorityQueue<T> copy(PriorityQueue<T> from) {
+			PriorityQueue<T> result = factory.createPriorityQueue();
+
+			for (T element: from) {
+				result.offer(elementSerializer.copy(element));
+			}
+
+			return result;
+		}
+
+		@Override
+		public PriorityQueue<T> copy(PriorityQueue<T> from, PriorityQueue<T> reuse) {
+			reuse.clear();
+
+			for (T element: from) {
+				reuse.offer(elementSerializer.copy(element));
+			}
+
+			return reuse;
+		}
+
+		@Override
+		public int getLength() {
+			return 0;
+		}
+
+		@Override
+		public void serialize(PriorityQueue<T> record, DataOutputView target) throws IOException {
+			target.writeInt(record.size());
+
+			for (T element: record) {
+				elementSerializer.serialize(element, target);
+			}
+		}
+
+		@Override
+		public PriorityQueue<T> deserialize(DataInputView source) throws IOException {
+			PriorityQueue<T> result = factory.createPriorityQueue();
+
+			return deserialize(result, source);
+		}
+
+		@Override
+		public PriorityQueue<T> deserialize(PriorityQueue<T> reuse, DataInputView source) throws IOException {
+			reuse.clear();
+
+			int numberEntries = source.readInt();
+
+			for (int i = 0; i < numberEntries; i++) {
+				reuse.offer(elementSerializer.deserialize(source));
+			}
+
+			return reuse;
+		}
+
+		@Override
+		public void copy(DataInputView source, DataOutputView target) throws IOException {
+
+		}
+
+		@Override
+		public boolean equals(Object obj) {
+			if (obj instanceof PriorityQueueSerializer) {
+				@SuppressWarnings("unchecked")
+				PriorityQueueSerializer<T> other = (PriorityQueueSerializer<T>) obj;
+
+				return factory.equals(other.factory) && elementSerializer.equals(other.elementSerializer);
+			} else {
+				return false;
+			}
+		}
+
+		@Override
+		public boolean canEqual(Object obj) {
+			return obj instanceof PriorityQueueSerializer;
+		}
+
+		@Override
+		public int hashCode() {
+			return Objects.hash(factory, elementSerializer);
+		}
+	}
+
+	private interface PriorityQueueFactory<T> extends Serializable {
+		PriorityQueue<T> createPriorityQueue();
+	}
+
+	private static class PriorityQueueStreamRecordFactory<T> implements PriorityQueueFactory<StreamRecord<T>> {
+
+		private static final long serialVersionUID = 1254766984454616593L;
+
+		@Override
+		public PriorityQueue<StreamRecord<T>> createPriorityQueue() {
+			return new PriorityQueue<StreamRecord<T>>(INITIAL_PRIORITY_QUEUE_CAPACITY, new StreamRecordComparator<T>());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/57ef6c31/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java
new file mode 100644
index 0000000..a3fffa5
--- /dev/null
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.operator;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.EitherTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.cep.nfa.compiler.NFACompiler;
+import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import org.apache.flink.types.Either;
+
+import java.util.Map;
+
+public class CEPOperatorUtils {
+	private static final String PATTERN_OPERATOR_NAME = "AbstractCEPPatternOperator";
+
+	/**
+	 * Creates a data stream containing the fully matching event patterns of the NFA computation.
+	 *
+	 * @param <K> Type of the key
+	 * @return Data stream containing fully matched event sequences stored in a {@link Map}. The
+	 * events are indexed by their associated names of the pattern.
+	 */
+	public static <K, T> DataStream<Map<String, T>> createPatternStream(DataStream<T> inputStream, Pattern<T, ?> pattern) {
+		final TypeSerializer<T> inputSerializer = inputStream.getType().createSerializer(inputStream.getExecutionConfig());
+
+		// check whether we use processing time
+		final boolean isProcessingTime = inputStream.getExecutionEnvironment().getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime;
+
+		// compile our pattern into a NFAFactory to instantiate NFAs later on
+		final NFACompiler.NFAFactory<T> nfaFactory = NFACompiler.compileFactory(pattern, inputSerializer, false);
+
+		final DataStream<Map<String, T>> patternStream;
+
+		if (inputStream instanceof KeyedStream) {
+			// We have to use the KeyedCEPPatternOperator which can deal with keyed input streams
+			KeyedStream<T, K> keyedStream= (KeyedStream<T, K>) inputStream;
+
+			KeySelector<T, K> keySelector = keyedStream.getKeySelector();
+			TypeSerializer<K> keySerializer = keyedStream.getKeyType().createSerializer(keyedStream.getExecutionConfig());
+
+			patternStream = keyedStream.transform(
+				PATTERN_OPERATOR_NAME,
+				(TypeInformation<Map<String, T>>) (TypeInformation<?>) TypeExtractor.getForClass(Map.class),
+				new KeyedCEPPatternOperator<>(
+					inputSerializer,
+					isProcessingTime,
+					keySelector,
+					keySerializer,
+					nfaFactory));
+		} else {
+			patternStream = inputStream.transform(
+				PATTERN_OPERATOR_NAME,
+				(TypeInformation<Map<String, T>>) (TypeInformation<?>) TypeExtractor.getForClass(Map.class),
+				new CEPPatternOperator<T>(
+					inputSerializer,
+					isProcessingTime,
+					nfaFactory
+				)).setParallelism(1);
+		}
+
+		return patternStream;
+	}
+
+	/**
+	 * Creates a data stream containing fully matching event patterns or partially matching event
+	 * patterns which have timed out. The former are wrapped in a Either.Right and the latter in a
+	 * Either.Left type.
+	 *
+	 * @param <K> Type of the key
+	 * @return Data stream containing fully matched and partially matched event sequences wrapped in
+	 * a {@link Either} instance.
+	 */
+	public static <K, T> DataStream<Either<Tuple2<Map<String, T>, Long>, Map<String, T>>> createTimeoutPatternStream(DataStream<T> inputStream, Pattern<T, ?> pattern) {
+
+		final TypeSerializer<T> inputSerializer = inputStream.getType().createSerializer(inputStream.getExecutionConfig());
+
+		// check whether we use processing time
+		final boolean isProcessingTime = inputStream.getExecutionEnvironment().getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime;
+
+		// compile our pattern into a NFAFactory to instantiate NFAs later on
+		final NFACompiler.NFAFactory<T> nfaFactory = NFACompiler.compileFactory(pattern, inputSerializer, true);
+
+		final DataStream<Either<Tuple2<Map<String, T>, Long>, Map<String, T>>> patternStream;
+
+		final TypeInformation<Map<String, T>> rightTypeInfo = (TypeInformation<Map<String, T>>) (TypeInformation<?>)  TypeExtractor.getForClass(Map.class);
+		final TypeInformation<Tuple2<Map<String, T>, Long>> leftTypeInfo = new TupleTypeInfo<>(rightTypeInfo, BasicTypeInfo.LONG_TYPE_INFO);
+		final TypeInformation<Either<Tuple2<Map<String, T>, Long>, Map<String, T>>> eitherTypeInformation = new EitherTypeInfo<>(leftTypeInfo, rightTypeInfo);
+
+		if (inputStream instanceof KeyedStream) {
+			// We have to use the KeyedCEPPatternOperator which can deal with keyed input streams
+			KeyedStream<T, K> keyedStream= (KeyedStream<T, K>) inputStream;
+
+			KeySelector<T, K> keySelector = keyedStream.getKeySelector();
+			TypeSerializer<K> keySerializer = keyedStream.getKeyType().createSerializer(keyedStream.getExecutionConfig());
+
+			patternStream = keyedStream.transform(
+				PATTERN_OPERATOR_NAME,
+				eitherTypeInformation,
+				new TimeoutKeyedCEPPatternOperator<T, K>(
+					inputSerializer,
+					isProcessingTime,
+					keySelector,
+					keySerializer,
+					nfaFactory));
+		} else {
+			patternStream = inputStream.transform(
+				PATTERN_OPERATOR_NAME,
+				eitherTypeInformation,
+				new TimeoutCEPPatternOperator<T>(
+					inputSerializer,
+					isProcessingTime,
+					nfaFactory
+				)).setParallelism(1);
+		}
+
+		return patternStream;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/57ef6c31/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPPatternOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPPatternOperator.java
index 7760817..561697d 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPPatternOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPPatternOperator.java
@@ -19,123 +19,44 @@
 package org.apache.flink.cep.operator;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.cep.nfa.NFA;
 import org.apache.flink.cep.nfa.compiler.NFACompiler;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.runtime.state.AbstractStateBackend;
-import org.apache.flink.runtime.state.StreamStateHandle;
-import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
-import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
 
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.util.PriorityQueue;
+import java.util.Collection;
+import java.util.Map;
 
 /**
- * CEP pattern operator implementation which is used for non keyed streams. Consequently,
- * the operator state only includes a single {@link NFA} and a priority queue to order out of order
- * elements in case of event time processing.
+ * CEP pattern operator which only returns fully matched event patterns stored in a {@link Map}. The
+ * events are indexed by the event names associated in the pattern specification.
  *
- * @param <IN> Type of the input elements
+ * @param <IN> Type of the input events
  */
-public class CEPPatternOperator<IN> extends AbstractCEPPatternOperator<IN> {
-	private static final long serialVersionUID = 7487334510746595640L;
+public class CEPPatternOperator<IN> extends AbstractCEPPatternOperator<IN, Map<String, IN>> {
+	private static final long serialVersionUID = 376300194236250645L;
 
-	private final StreamRecordSerializer<IN> streamRecordSerializer;
-
-	// global nfa for all elements
-	private NFA<IN> nfa;
-
-	// queue to buffer out of order stream records
-	private transient PriorityQueue<StreamRecord<IN>> priorityQueue;
-
-	public CEPPatternOperator(
-			TypeSerializer<IN> inputSerializer,
-			boolean isProcessingTime,
-			NFACompiler.NFAFactory<IN> nfaFactory) {
-		super(inputSerializer, isProcessingTime);
-
-		this.streamRecordSerializer = new StreamRecordSerializer<>(inputSerializer);
-		this.nfa = nfaFactory.createNFA();
-	}
-
-	@Override
-	public void open() {
-		if (priorityQueue == null) {
-			priorityQueue = new PriorityQueue<StreamRecord<IN>>(INITIAL_PRIORITY_QUEUE_CAPACITY, new StreamRecordComparator<IN>());
-		}
-	}
-
-	@Override
-	protected NFA<IN> getNFA() throws IOException {
-		return nfa;
-	}
-
-	@Override
-	protected PriorityQueue<StreamRecord<IN>> getPriorityQueue() throws IOException {
-		return priorityQueue;
+	public CEPPatternOperator(TypeSerializer<IN> inputSerializer, boolean isProcessingTime, NFACompiler.NFAFactory<IN> nfaFactory) {
+		super(inputSerializer, isProcessingTime, nfaFactory);
 	}
 
 	@Override
-	public void processWatermark(Watermark mark) throws Exception {
-		while(!priorityQueue.isEmpty() && priorityQueue.peek().getTimestamp() <= mark.getTimestamp()) {
-			StreamRecord<IN> streamRecord = priorityQueue.poll();
-
-			processEvent(nfa, streamRecord.getValue(), streamRecord.getTimestamp());
-		}
-
-		output.emitWatermark(mark);
-	}
-
-	@Override
-	public StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception {
-		StreamTaskState taskState = super.snapshotOperatorState(checkpointId, timestamp);
-
-		final AbstractStateBackend.CheckpointStateOutputStream os = this.getStateBackend().createCheckpointStateOutputStream(
-			checkpointId,
+	protected void processEvent(NFA<IN> nfa, IN event, long timestamp) {
+		Tuple2<Collection<Map<String, IN>>, Collection<Tuple2<Map<String, IN>, Long>>> patterns = nfa.process(
+			event,
 			timestamp);
 
-		final ObjectOutputStream oos = new ObjectOutputStream(os);
-		final AbstractStateBackend.CheckpointStateOutputView ov = new AbstractStateBackend.CheckpointStateOutputView(os);
+		Collection<Map<String, IN>> matchedPatterns = patterns.f0;
 
-		oos.writeObject(nfa);
+		if (!matchedPatterns.isEmpty()) {
+			StreamRecord<Map<String, IN>> streamRecord = new StreamRecord<Map<String, IN>>(
+				null,
+				timestamp);
 
-		ov.writeInt(priorityQueue.size());
-
-		for (StreamRecord<IN> streamRecord: priorityQueue) {
-			streamRecordSerializer.serialize(streamRecord, ov);
+			for (Map<String, IN> pattern: matchedPatterns) {
+				streamRecord.replace(pattern);
+				output.collect(streamRecord);
+			}
 		}
-
-		taskState.setOperatorState(os.closeAndGetHandle());
-
-		return taskState;
-	}
-
-	@Override
-	@SuppressWarnings("unchecked")
-	public void restoreState(StreamTaskState state, long recoveryTimestamp) throws Exception {
-		super.restoreState(state, recoveryTimestamp);
-
-		StreamStateHandle stream = (StreamStateHandle)state.getOperatorState();
-
-		final InputStream is = stream.getState(getUserCodeClassloader());
-		final ObjectInputStream ois = new ObjectInputStream(is);
-		final DataInputViewStreamWrapper div = new DataInputViewStreamWrapper(is);
-
-		nfa = (NFA<IN>)ois.readObject();
-
-		int numberPriorityQueueEntries = div.readInt();
-
-		priorityQueue = new PriorityQueue<StreamRecord<IN>>(numberPriorityQueueEntries, new StreamRecordComparator<IN>());
-
-		for (int i = 0; i <numberPriorityQueueEntries; i++) {
-			priorityQueue.offer(streamRecordSerializer.deserialize(div));
-		}
-
-		div.close();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/57ef6c31/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java
index 5db8ef2..62d82d9 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java
@@ -18,319 +18,48 @@
 
 package org.apache.flink.cep.operator;
 
-import org.apache.flink.api.common.state.ValueState;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.cep.nfa.NFA;
 import org.apache.flink.cep.nfa.compiler.NFACompiler;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.runtime.state.AbstractStateBackend;
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
-import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
 
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.HashSet;
-import java.util.Objects;
-import java.util.PriorityQueue;
-import java.util.Set;
+import java.util.Collection;
+import java.util.Map;
 
 /**
- * CEP pattern operator implementation for a keyed input stream. For each key, the operator creates
- * a {@link NFA} and a priority queue to buffer out of order elements. Both data structures are
- * stored using the key value state. Additionally, the set of all seen keys is kept as part of the
- * operator state. This is necessary to trigger the execution for all keys upon receiving a new
- * watermark.
+ * CEP pattern operator which only returns fully matched event patterns stored in a {@link Map}. The
+ * events are indexed by the event names associated in the pattern specification. The operator works
+ * on keyed input data.
  *
- * @param <IN> Type of the input elements
- * @param <KEY> Type of the key on which the input stream is keyed
+ * @param <IN> Type of the input events
+ * @param <KEY> Type of the key
  */
-public class KeyedCEPPatternOperator<IN, KEY> extends AbstractCEPPatternOperator<IN> {
-	private static final long serialVersionUID = -7234999752950159178L;
+public class KeyedCEPPatternOperator<IN, KEY> extends AbstractKeyedCEPPatternOperator<IN, KEY, Map<String, IN>> {
+	private static final long serialVersionUID = 5328573789532074581L;
 
-	private static final String NFA_OPERATOR_STATE_NAME = "nfaOperatorState";
-	private static final String PRIORIRY_QUEUE_STATE_NAME = "priorityQueueStateName";
-
-	// necessary to extract the key from the input elements
-	private final KeySelector<IN, KEY> keySelector;
-
-	// necessary to serialize the set of seen keys
-	private final TypeSerializer<KEY> keySerializer;
-
-	private final PriorityQueueFactory<StreamRecord<IN>> priorityQueueFactory = new PriorityQueueStreamRecordFactory<>();
-	private final NFACompiler.NFAFactory<IN> nfaFactory;
-
-	// stores the keys we've already seen to trigger execution upon receiving a watermark
-	// this can be problematic, since it is never cleared
-	// TODO: fix once the state refactoring is completed
-	private transient Set<KEY> keys;
-
-	private transient ValueState<NFA<IN>> nfaOperatorState;
-	private transient ValueState<PriorityQueue<StreamRecord<IN>>> priorityQueueOperatorState;
-
-	public KeyedCEPPatternOperator(
-			TypeSerializer<IN> inputSerializer,
-			boolean isProcessingTime,
-			KeySelector<IN, KEY> keySelector,
-			TypeSerializer<KEY> keySerializer,
-			NFACompiler.NFAFactory<IN> nfaFactory) {
-		super(inputSerializer, isProcessingTime);
-
-		this.keySelector = keySelector;
-		this.keySerializer = keySerializer;
-
-		this.nfaFactory = nfaFactory;
-	}
-
-	@Override
-	@SuppressWarnings("unchecked")
-	public void open() throws Exception {
-		if (keys == null) {
-			keys = new HashSet<>();
-		}
-
-		if (nfaOperatorState == null) {
-			nfaOperatorState = getPartitionedState(
-					new ValueStateDescriptor<NFA<IN>>(
-						NFA_OPERATOR_STATE_NAME,
-						new KryoSerializer<NFA<IN>>((Class<NFA<IN>>) (Class<?>) NFA.class, getExecutionConfig()),
-						null));
-		}
-
-		if (priorityQueueOperatorState == null) {
-			priorityQueueOperatorState = getPartitionedState(
-					new ValueStateDescriptor<PriorityQueue<StreamRecord<IN>>>(
-						PRIORIRY_QUEUE_STATE_NAME,
-						new PriorityQueueSerializer<StreamRecord<IN>>(
-							new StreamRecordSerializer<IN>(getInputSerializer()),
-							new PriorityQueueStreamRecordFactory<IN>()),
-						null));
-		}
-	}
-
-	@Override
-	protected NFA<IN> getNFA() throws IOException {
-		NFA<IN> nfa = nfaOperatorState.value();
-
-		if (nfa == null) {
-			nfa = nfaFactory.createNFA();
-
-			nfaOperatorState.update(nfa);
-		}
-
-		return nfa;
+	public KeyedCEPPatternOperator(TypeSerializer<IN> inputSerializer, boolean isProcessingTime, KeySelector<IN, KEY> keySelector, TypeSerializer<KEY> keySerializer, NFACompiler.NFAFactory<IN> nfaFactory) {
+		super(inputSerializer, isProcessingTime, keySelector, keySerializer, nfaFactory);
 	}
 
 	@Override
-	protected PriorityQueue<StreamRecord<IN>> getPriorityQueue() throws IOException {
-		PriorityQueue<StreamRecord<IN>> priorityQueue = priorityQueueOperatorState.value();
-
-		if (priorityQueue == null) {
-			priorityQueue = priorityQueueFactory.createPriorityQueue();
-
-			priorityQueueOperatorState.update(priorityQueue);
-		}
-
-		return priorityQueue;
-	}
-
-	@Override
-	public void processElement(StreamRecord<IN> element) throws Exception {
-		keys.add(keySelector.getKey(element.getValue()));
-
-		super.processElement(element);
-	}
-
-	@Override
-	public void processWatermark(Watermark mark) throws Exception {
-		// iterate over all keys to trigger the execution of the buffered elements
-		for (KEY key: keys) {
-			setKeyContext(key);
-
-			PriorityQueue<StreamRecord<IN>> priorityQueue = getPriorityQueue();
-
-			NFA<IN> nfa = getNFA();
-
-			while (!priorityQueue.isEmpty() && priorityQueue.peek().getTimestamp() <= mark.getTimestamp()) {
-				StreamRecord<IN> streamRecord = priorityQueue.poll();
-
-				processEvent(nfa, streamRecord.getValue(), streamRecord.getTimestamp());
-			}
-		}
-
-		output.emitWatermark(mark);
-	}
-
-	@Override
-	public StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception {
-		StreamTaskState taskState = super.snapshotOperatorState(checkpointId, timestamp);
-
-		AbstractStateBackend.CheckpointStateOutputView ov = getStateBackend().createCheckpointStateOutputView(checkpointId, timestamp);
-
-		ov.writeInt(keys.size());
-
-		for (KEY key: keys) {
-			keySerializer.serialize(key, ov);
-		}
-
-		taskState.setOperatorState(ov.closeAndGetHandle());
-
-		return taskState;
-	}
-
-	@Override
-	public void restoreState(StreamTaskState state, long recoveryTimestamp) throws Exception {
-		super.restoreState(state, recoveryTimestamp);
-
-		@SuppressWarnings("unchecked")
-		StateHandle<DataInputView> stateHandle = (StateHandle<DataInputView>) state;
-
-		DataInputView inputView = stateHandle.getState(getUserCodeClassloader());
+	protected void processEvent(NFA<IN> nfa, IN event, long timestamp) {
+		Tuple2<Collection<Map<String, IN>>, Collection<Tuple2<Map<String, IN>, Long>>> patterns = nfa.process(
+			event,
+			timestamp);
 
-		if (keys == null) {
-			keys = new HashSet<>();
-		}
-
-		int numberEntries = inputView.readInt();
-
-		for (int i = 0; i <numberEntries; i++) {
-			keys.add(keySerializer.deserialize(inputView));
-		}
-	}
-
-	/**
-	 * Custom type serializer implementation to serialize priority queues.
-	 *
-	 * @param <T> Type of the priority queue's elements
-	 */
-	private static class PriorityQueueSerializer<T> extends TypeSerializer<PriorityQueue<T>> {
-
-		private static final long serialVersionUID = -231980397616187715L;
-
-		private final TypeSerializer<T> elementSerializer;
-		private final PriorityQueueFactory<T> factory;
-
-		public PriorityQueueSerializer(final TypeSerializer<T> elementSerializer, final PriorityQueueFactory<T> factory) {
-			this.elementSerializer = elementSerializer;
-			this.factory = factory;
-		}
-
-		@Override
-		public boolean isImmutableType() {
-			return false;
-		}
-
-		@Override
-		public TypeSerializer<PriorityQueue<T>> duplicate() {
-			return new PriorityQueueSerializer<>(elementSerializer.duplicate(), factory);
-		}
-
-		@Override
-		public PriorityQueue<T> createInstance() {
-			return factory.createPriorityQueue();
-		}
-
-		@Override
-		public PriorityQueue<T> copy(PriorityQueue<T> from) {
-			PriorityQueue<T> result = factory.createPriorityQueue();
-
-			for (T element: from) {
-				result.offer(elementSerializer.copy(element));
-			}
-
-			return result;
-		}
-
-		@Override
-		public PriorityQueue<T> copy(PriorityQueue<T> from, PriorityQueue<T> reuse) {
-			reuse.clear();
-
-			for (T element: from) {
-				reuse.offer(elementSerializer.copy(element));
-			}
-
-			return reuse;
-		}
-
-		@Override
-		public int getLength() {
-			return 0;
-		}
+		Collection<Map<String, IN>> matchedPatterns = patterns.f0;
 
-		@Override
-		public void serialize(PriorityQueue<T> record, DataOutputView target) throws IOException {
-			target.writeInt(record.size());
+		if (!matchedPatterns.isEmpty()) {
+			StreamRecord<Map<String, IN>> streamRecord = new StreamRecord<Map<String, IN>>(
+				null,
+				timestamp);
 
-			for (T element: record) {
-				elementSerializer.serialize(element, target);
+			for (Map<String, IN> pattern: matchedPatterns) {
+				streamRecord.replace(pattern);
+				output.collect(streamRecord);
 			}
 		}
-
-		@Override
-		public PriorityQueue<T> deserialize(DataInputView source) throws IOException {
-			PriorityQueue<T> result = factory.createPriorityQueue();
-
-			return deserialize(result, source);
-		}
-
-		@Override
-		public PriorityQueue<T> deserialize(PriorityQueue<T> reuse, DataInputView source) throws IOException {
-			reuse.clear();
-
-			int numberEntries = source.readInt();
-
-			for (int i = 0; i < numberEntries; i++) {
-				reuse.offer(elementSerializer.deserialize(source));
-			}
-
-			return reuse;
-		}
-
-		@Override
-		public void copy(DataInputView source, DataOutputView target) throws IOException {
-
-		}
-
-		@Override
-		public boolean equals(Object obj) {
-			if (obj instanceof PriorityQueueSerializer) {
-				@SuppressWarnings("unchecked")
-				PriorityQueueSerializer<T> other = (PriorityQueueSerializer<T>) obj;
-
-				return factory.equals(other.factory) && elementSerializer.equals(other.elementSerializer);
-			} else {
-				return false;
-			}
-		}
-
-		@Override
-		public boolean canEqual(Object obj) {
-			return obj instanceof PriorityQueueSerializer;
-		}
-
-		@Override
-		public int hashCode() {
-			return Objects.hash(factory, elementSerializer);
-		}
-	}
-
-	private interface PriorityQueueFactory<T> extends Serializable {
-		PriorityQueue<T> createPriorityQueue();
-	}
-
-	private static class PriorityQueueStreamRecordFactory<T> implements PriorityQueueFactory<StreamRecord<T>> {
-
-		private static final long serialVersionUID = 1254766984454616593L;
-
-		@Override
-		public PriorityQueue<StreamRecord<T>> createPriorityQueue() {
-			return new PriorityQueue<StreamRecord<T>>(INITIAL_PRIORITY_QUEUE_CAPACITY, new StreamRecordComparator<T>());
-		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/57ef6c31/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutCEPPatternOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutCEPPatternOperator.java
new file mode 100644
index 0000000..9b0c951
--- /dev/null
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutCEPPatternOperator.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.operator;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.cep.nfa.NFA;
+import org.apache.flink.cep.nfa.compiler.NFACompiler;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.types.Either;
+
+import java.util.Collection;
+import java.util.Map;
+
+/**
+ * CEP pattern operator which only returns fully matched event patterns and partially matched event
+ * patterns which have timed out wrapped in {@link Either}. The matched events are stored in a
+ * {@link Map} and are indexed by the event names associated in the pattern specification.
+ *
+ * The fully matched event patterns are returned as a {@link Either.Right} instance and the
+ * partially matched event patterns are returned as a {@link Either.Left} instance.
+ *
+ * @param <IN> Type of the input events
+ */
+public class TimeoutCEPPatternOperator<IN> extends AbstractCEPPatternOperator<IN, Either<Tuple2<Map<String, IN>, Long>, Map<String, IN>>> {
+	private static final long serialVersionUID = -3911002597290988201L;
+
+	public TimeoutCEPPatternOperator(
+		TypeSerializer<IN> inputSerializer,
+		boolean isProcessingTime,
+		NFACompiler.NFAFactory<IN> nfaFactory) {
+
+		super(inputSerializer, isProcessingTime, nfaFactory);
+	}
+
+	@Override
+	protected void processEvent(NFA<IN> nfa, IN event, long timestamp) {
+		Tuple2<Collection<Map<String, IN>>, Collection<Tuple2<Map<String, IN>, Long>>> patterns = nfa.process(
+			event,
+			timestamp);
+
+		Collection<Map<String, IN>> matchedPatterns = patterns.f0;
+		Collection<Tuple2<Map<String, IN>, Long>> partialPatterns = patterns.f1;
+
+		StreamRecord<Either<Tuple2<Map<String, IN>, Long>, Map<String, IN>>> streamRecord = new StreamRecord<Either<Tuple2<Map<String, IN>, Long>, Map<String, IN>>>(
+			null,
+			timestamp);
+
+		if (!matchedPatterns.isEmpty()) {
+			for (Map<String, IN> matchedPattern : matchedPatterns) {
+				streamRecord.replace(Either.Right(matchedPattern));
+				output.collect(streamRecord);
+			}
+		}
+
+		if (!partialPatterns.isEmpty()) {
+			for (Tuple2<Map<String, IN>, Long> partialPattern: partialPatterns) {
+				streamRecord.replace(Either.Left(partialPattern));
+				output.collect(streamRecord);
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/57ef6c31/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java
new file mode 100644
index 0000000..23b1a91
--- /dev/null
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.operator;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.cep.nfa.NFA;
+import org.apache.flink.cep.nfa.compiler.NFACompiler;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.types.Either;
+
+import java.util.Collection;
+import java.util.Map;
+
+public class TimeoutKeyedCEPPatternOperator<IN, KEY> extends AbstractKeyedCEPPatternOperator<IN, KEY, Either<Tuple2<Map<String, IN>, Long>, Map<String, IN>>> {
+	private static final long serialVersionUID = 3570542177814518158L;
+
+	public TimeoutKeyedCEPPatternOperator(
+		TypeSerializer<IN> inputSerializer,
+		boolean isProcessingTime,
+		KeySelector<IN, KEY> keySelector,
+		TypeSerializer<KEY> keySerializer,
+		NFACompiler.NFAFactory<IN> nfaFactory) {
+
+		super(inputSerializer, isProcessingTime, keySelector, keySerializer, nfaFactory);
+	}
+
+	@Override
+	protected void processEvent(NFA<IN> nfa, IN event, long timestamp) {
+		Tuple2<Collection<Map<String, IN>>, Collection<Tuple2<Map<String, IN>, Long>>> patterns = nfa.process(
+			event,
+			timestamp);
+
+		Collection<Map<String, IN>> matchedPatterns = patterns.f0;
+		Collection<Tuple2<Map<String, IN>, Long>> partialPatterns = patterns.f1;
+
+		StreamRecord<Either<Tuple2<Map<String, IN>, Long>, Map<String, IN>>> streamRecord = new StreamRecord<Either<Tuple2<Map<String, IN>, Long>, Map<String, IN>>>(
+			null,
+			timestamp);
+
+		if (!matchedPatterns.isEmpty()) {
+			for (Map<String, IN> matchedPattern : matchedPatterns) {
+				streamRecord.replace(Either.Right(matchedPattern));
+				output.collect(streamRecord);
+			}
+		}
+
+		if (!partialPatterns.isEmpty()) {
+			for (Tuple2<Map<String, IN>, Long> partialPattern: partialPatterns) {
+				streamRecord.replace(Either.Left(partialPattern));
+				output.collect(streamRecord);
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/57ef6c31/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
index 0a287a2..29044d8 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
@@ -32,6 +32,7 @@ import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
 
+import org.apache.flink.types.Either;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;
@@ -422,4 +423,88 @@ public class CEPITCase extends StreamingMultipleProgramsTestBase {
 
 		env.execute();
 	}
+
+	@Test
+	public void testTimeoutHandling() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(1);
+		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+		// (Event, timestamp)
+		DataStream<Event> input = env.fromElements(
+			Tuple2.of(new Event(1, "start", 1.0), 1L),
+			Tuple2.of(new Event(1, "middle", 2.0), 5L),
+			Tuple2.of(new Event(1, "start", 2.0), 4L),
+			Tuple2.of(new Event(1, "end", 2.0), 6L)
+		).assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<Tuple2<Event,Long>>() {
+
+			@Override
+			public long extractTimestamp(Tuple2<Event, Long> element, long currentTimestamp) {
+				return element.f1;
+			}
+
+			@Override
+			public Watermark checkAndGetNextWatermark(Tuple2<Event, Long> lastElement, long extractedTimestamp) {
+				return new Watermark(lastElement.f1 - 5);
+			}
+
+		}).map(new MapFunction<Tuple2<Event, Long>, Event>() {
+
+			@Override
+			public Event map(Tuple2<Event, Long> value) throws Exception {
+				return value.f0;
+			}
+		});
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("start");
+			}
+		}).followedBy("middle").where(new FilterFunction<Event>() {
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("middle");
+			}
+		}).followedBy("end").where(new FilterFunction<Event>() {
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("end");
+			}
+		}).within(Time.milliseconds(3));
+
+		DataStream<Either<String, String>> result = CEP.pattern(input, pattern).select(
+			new PatternTimeoutFunction<Event, String>() {
+				@Override
+				public String timeout(Map<String, Event> pattern, long timeoutTimestamp) throws Exception {
+					return pattern.get("start").getPrice() + "";
+				}
+			},
+			new PatternSelectFunction<Event, String>() {
+
+				@Override
+				public String select(Map<String, Event> pattern) {
+					StringBuilder builder = new StringBuilder();
+
+					builder.append(pattern.get("start").getPrice()).append(",")
+						.append(pattern.get("middle").getPrice()).append(",")
+						.append(pattern.get("end").getPrice());
+
+					return builder.toString();
+				}
+			}
+		);
+
+		result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+
+		// the expected sequences of matching event ids
+		expected = "Left(1.0)\nRight(2.0,2.0,2.0)";
+
+		env.execute();
+
+
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/57ef6c31/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
index 0a3dc7d..6152191 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
@@ -19,6 +19,7 @@
 package org.apache.flink.cep.nfa;
 
 import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.cep.Event;
 import org.apache.flink.cep.StreamEvent;
 import org.apache.flink.cep.SubEvent;
@@ -30,8 +31,11 @@ import org.junit.Test;
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import static org.junit.Assert.assertEquals;
 
@@ -77,13 +81,13 @@ public class NFAITCase extends TestLogger {
 			}
 		});
 
-		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer());
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
 		List<Map<String, Event>> resultingPatterns = new ArrayList<>();
 
 		for (StreamEvent<Event> inputEvent: inputEvents) {
 			Collection<Map<String, Event>> patterns = nfa.process(
 				inputEvent.getEvent(),
-				inputEvent.getTimestamp());
+				inputEvent.getTimestamp()).f0;
 
 			resultingPatterns.addAll(patterns);
 		}
@@ -141,10 +145,10 @@ public class NFAITCase extends TestLogger {
 		}).within(Time.milliseconds(10));
 
 
-		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer());
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
 
 		for (StreamEvent<Event> event: events) {
-			Collection<Map<String, Event>> patterns = nfa.process(event.getEvent(), event.getTimestamp());
+			Collection<Map<String, Event>> patterns = nfa.process(event.getEvent(), event.getTimestamp()).f0;
 
 			resultingPatterns.addAll(patterns);
 		}
@@ -157,4 +161,86 @@ public class NFAITCase extends TestLogger {
 		assertEquals(middleEvent, patternMap.get("middle"));
 		assertEquals(endEvent, patternMap.get("end"));
 	}
+
+	/**
+	 * Tests that the NFA successfully returns partially matched event sequences when they've timed
+	 * out.
+	 */
+	@Test
+	public void testSimplePatternWithTimeoutHandling() {
+		List<StreamEvent<Event>> events = new ArrayList<>();
+		List<Map<String, Event>> resultingPatterns = new ArrayList<>();
+		Set<Tuple2<Map<String, Event>, Long>> resultingTimeoutPatterns = new HashSet<>();
+		Set<Tuple2<Map<String, Event>, Long>> expectedTimeoutPatterns = new HashSet<>();
+
+
+		events.add(new StreamEvent<Event>(new Event(1, "start", 1.0), 1));
+		events.add(new StreamEvent<Event>(new Event(2, "start", 1.0), 2));
+		events.add(new StreamEvent<Event>(new Event(3, "middle", 1.0), 3));
+		events.add(new StreamEvent<Event>(new Event(4, "foobar", 1.0), 4));
+		events.add(new StreamEvent<Event>(new Event(5, "end", 1.0), 11));
+		events.add(new StreamEvent<Event>(new Event(6, "end", 1.0), 13));
+
+		Map<String, Event> timeoutPattern1 = new HashMap<>();
+		timeoutPattern1.put("start", new Event(1, "start", 1.0));
+		timeoutPattern1.put("middle", new Event(3, "middle", 1.0));
+
+		Map<String, Event> timeoutPattern2 = new HashMap<>();
+		timeoutPattern2.put("start", new Event(2, "start", 1.0));
+		timeoutPattern2.put("middle", new Event(3, "middle", 1.0));
+
+		Map<String, Event> timeoutPattern3 = new HashMap<>();
+		timeoutPattern3.put("start", new Event(1, "start", 1.0));
+
+		Map<String, Event> timeoutPattern4 = new HashMap<>();
+		timeoutPattern4.put("start", new Event(2, "start", 1.0));
+
+		expectedTimeoutPatterns.add(Tuple2.of(timeoutPattern1, 11L));
+		expectedTimeoutPatterns.add(Tuple2.of(timeoutPattern2, 13L));
+		expectedTimeoutPatterns.add(Tuple2.of(timeoutPattern3, 11L));
+		expectedTimeoutPatterns.add(Tuple2.of(timeoutPattern4, 13L));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 7907391379273505897L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("start");
+			}
+		}).followedBy("middle").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = -3268741540234334074L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("middle");
+			}
+		}).followedBy("end").where(new FilterFunction<Event>() {
+			private static final long serialVersionUID = -8995174172182138608L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("end");
+			}
+		}).within(Time.milliseconds(10));
+
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), true);
+
+		for (StreamEvent<Event> event: events) {
+			Tuple2<Collection<Map<String, Event>>, Collection<Tuple2<Map<String, Event>, Long>>> patterns =
+				nfa.process(event.getEvent(), event.getTimestamp());
+
+			Collection<Map<String, Event>> matchedPatterns = patterns.f0;
+			Collection<Tuple2<Map<String, Event>, Long>> timeoutPatterns = patterns.f1;
+
+			resultingPatterns.addAll(matchedPatterns);
+			resultingTimeoutPatterns.addAll(timeoutPatterns);
+		}
+
+		assertEquals(1, resultingPatterns.size());
+		assertEquals(expectedTimeoutPatterns.size(), resultingTimeoutPatterns.size());
+
+		assertEquals(expectedTimeoutPatterns, resultingTimeoutPatterns);
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/57ef6c31/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java
index a915dee..f48dab3 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java
@@ -43,7 +43,7 @@ import static org.junit.Assert.assertEquals;
 public class NFATest extends TestLogger {
 	@Test
 	public void testSimpleNFA() {
-		NFA<Event> nfa = new NFA<>(Event.createTypeSerializer(), 0);
+		NFA<Event> nfa = new NFA<>(Event.createTypeSerializer(), 0, false);
 		List<StreamEvent<Event>> streamEvents = new ArrayList<>();
 
 		streamEvents.add(StreamEvent.of(new Event(1, "start", 1.0), 1L));
@@ -197,7 +197,7 @@ public class NFATest extends TestLogger {
 		Set<Map<String, T>> actualPatterns = new HashSet<>();
 
 		for (StreamEvent<T> streamEvent: inputs) {
-			Collection<Map<String, T>> matchedPatterns = nfa.process(streamEvent.getEvent(), streamEvent.getTimestamp());
+			Collection<Map<String, T>> matchedPatterns = nfa.process(streamEvent.getEvent(), streamEvent.getTimestamp()).f0;
 
 			actualPatterns.addAll(matchedPatterns);
 		}
@@ -207,7 +207,7 @@ public class NFATest extends TestLogger {
 
 	@Test
 	public void testNFASerialization() throws IOException, ClassNotFoundException {
-		NFA<Event> nfa = new NFA<>(Event.createTypeSerializer(), 0);
+		NFA<Event> nfa = new NFA<>(Event.createTypeSerializer(), 0, false);
 
 		State<Event> startingState = new State<>("", State.StateType.Start);
 		State<Event> startState = new State<>("start", State.StateType.Normal);
@@ -251,7 +251,7 @@ public class NFATest extends TestLogger {
 	}
 
 	private NFA<Event> createStartEndNFA(long windowLength) {
-		NFA<Event> nfa = new NFA<>(Event.createTypeSerializer(), windowLength);
+		NFA<Event> nfa = new NFA<>(Event.createTypeSerializer(), windowLength, false);
 
 		State<Event> startingState = new State<>("", State.StateType.Start);
 		State<Event> startState = new State<>("start", State.StateType.Normal);

http://git-wip-us.apache.org/repos/asf/flink/blob/57ef6c31/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
index eb3ead1..c790c35 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
@@ -65,9 +65,9 @@ public class NFACompilerTest extends TestLogger {
 			}
 		});
 
-		TypeInformation<Event> typeInformation = (TypeInformation<Event>) TypeExtractor.createTypeInfo(Event.class);
+		TypeInformation<Event> typeInformation = TypeExtractor.createTypeInfo(Event.class);
 
-		NFA<Event> nfa = NFACompiler.<Event>compile(pattern, typeInformation.createSerializer(new ExecutionConfig()));
+		NFA<Event> nfa = NFACompiler.compile(pattern, typeInformation.createSerializer(new ExecutionConfig()), false);
 
 		Set<State<Event>> states = nfa.getStates();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/57ef6c31/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
index 589589b..d5ef5be 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
@@ -32,7 +32,6 @@ import static org.junit.Assert.*;
 
 import java.util.Map;
 
-
 public class CEPOperatorTest extends TestLogger {
 
 	@Test
@@ -106,7 +105,7 @@ public class CEPOperatorTest extends TestLogger {
 
 		@Override
 		public NFA<T> createNFA() {
-			return new NFA<>(inputTypeSerializer.duplicate(), 0);
+			return new NFA<>(inputTypeSerializer.duplicate(), 0, false);
 		}
 	}
 }


[2/2] flink git commit: [FLINK-3317] [cep] Introduce timeout handler to CEP operator

Posted by tr...@apache.org.
[FLINK-3317] [cep] Introduce timeout handler to CEP operator

Introduce timeout handling flag for the NFACompiler

Expose timeout handling via Java API

Update documentation of PatternStream and CEP

Introduce timeout select function to CEP Scala API

Add select and flatSelect with timeout support to CEP Scala API

Add test cases for timeout handling

Update documentation

Fix CEP Scala API completeness test

This closes #2041.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/57ef6c31
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/57ef6c31
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/57ef6c31

Branch: refs/heads/master
Commit: 57ef6c315ee7aa467d922dd4d1213dfd8bc74fb0
Parents: c78b3c4
Author: Till Rohrmann <tr...@apache.org>
Authored: Thu May 26 11:34:16 2016 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Jun 16 16:48:17 2016 +0200

----------------------------------------------------------------------
 docs/apis/streaming/libs/cep.md                 |  68 +++-
 .../org/apache/flink/cep/CEPLambdaTest.java     |  19 +-
 .../apache/flink/cep/scala/PatternStream.scala  | 217 +++++++++++-
 ...StreamScalaJavaAPIInteroperabilityTest.scala | 130 +++++++
 ...nStreamScalaJavaAPIInteroperabiliyTest.scala |  87 -----
 .../src/main/java/org/apache/flink/cep/CEP.java |  65 +---
 .../flink/cep/PatternFlatTimeoutFunction.java   |  56 +++
 .../org/apache/flink/cep/PatternStream.java     | 236 ++++++++++++-
 .../flink/cep/PatternTimeoutFunction.java       |  56 +++
 .../main/java/org/apache/flink/cep/nfa/NFA.java |  60 ++--
 .../flink/cep/nfa/compiler/NFACompiler.java     |  32 +-
 .../AbstractCEPBasePatternOperator.java         |  92 +++++
 .../operator/AbstractCEPPatternOperator.java    | 146 +++++---
 .../AbstractKeyedCEPPatternOperator.java        | 337 +++++++++++++++++++
 .../flink/cep/operator/CEPOperatorUtils.java    | 143 ++++++++
 .../flink/cep/operator/CEPPatternOperator.java  | 123 ++-----
 .../cep/operator/KeyedCEPPatternOperator.java   | 319 ++----------------
 .../cep/operator/TimeoutCEPPatternOperator.java |  79 +++++
 .../TimeoutKeyedCEPPatternOperator.java         |  72 ++++
 .../java/org/apache/flink/cep/CEPITCase.java    |  85 +++++
 .../org/apache/flink/cep/nfa/NFAITCase.java     |  94 +++++-
 .../java/org/apache/flink/cep/nfa/NFATest.java  |   8 +-
 .../flink/cep/nfa/compiler/NFACompilerTest.java |   4 +-
 .../flink/cep/operator/CEPOperatorTest.java     |   3 +-
 24 files changed, 1861 insertions(+), 670 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/57ef6c31/docs/apis/streaming/libs/cep.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming/libs/cep.md b/docs/apis/streaming/libs/cep.md
index d76aab7..d465519 100644
--- a/docs/apis/streaming/libs/cep.md
+++ b/docs/apis/streaming/libs/cep.md
@@ -435,7 +435,7 @@ class MyPatternFlatSelectFunction<IN, OUT> implements PatternFlatSelectFunction<
 </div>
 
 <div data-lang="scala" markdown="1">
-The `select` method takes a section function as argument, which is called for each matching event sequence.
+The `select` method takes a selection function as argument, which is called for each matching event sequence.
 It receives a map of string/event pairs of the matched events.
 The string is defined by the name of the state to which the event has been matched.
 The selection function returns exactly one result per call.
@@ -463,6 +463,72 @@ def flatSelectFn(pattern : mutable.Map[String, IN], collector : Collector[OUT])
 </div>
 </div>
 
+### Handling Timed Out Partial Patterns
+
+Whenever a pattern has a window length associated via the `within` key word, it is possible that partial event patterns will be discarded because they exceed the window length.
+In order to react to these timeout events the `select` and `flatSelect` API calls allow to specify a timeout handler.
+This timeout handler is called for each partial event pattern which has timed out.
+The timeout handler receives all so far matched events of the partial pattern and the timestamp when the timeout was detected.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+In order to treat partial patterns, the `select` and `flatSelect` API calls offer an overloaded version which takes as the first parameter a `PatternTimeoutFunction`/`PatternFlatTimeoutFunction` and as second parameter the known `PatternSelectFunction`/`PatternFlatSelectFunction`.
+The return type of the timeout function can be different from the select function.
+The timeout event and the select event are wrapped in `Either.Left` and `Either.Right` respectively so that the resulting data stream is of type `org.apache.flink.types.Either`.
+
+{% highlight java %}
+PatternStream<Event> patternStream = CEP.pattern(input, pattern);
+
+DataStream<Either<TimeoutEvent, ComplexEvent>> result = patternStream.select(
+    new PatternTimeoutFunction<Event, TimeoutEvent>() {...},
+    new PatternSelectFunction<Event, ComplexEvent>() {...}
+);
+
+DataStream<Either<TimeoutEvent, ComplexEvent>> flatResult = patternStream.flatSelect(
+    new PatternFlatTimeoutFunction<Event, TimeoutEvent>() {...},
+    new PatternFlatSelectFunction<Event, ComplexEvent>() {...}
+);
+{% endhighlight %}
+
+</div>
+
+<div data-lang="scala" markdown="1">
+In order to treat partial patterns, the `select` API call offers an overloaded version which takes as the first parameter a timeout function and as second parameter a selection function.
+The timeout function is called with a map of string-event pairs of the partial match which has timed out and a long indicating when the timeout occurred.
+The string is defined by the name of the state to which the event has been matched.
+The timeout function returns exactly one result per call.
+The return type of the timeout function can be different from the select function.
+The timeout event and the select event are wrapped in `Left` and `Right` respectively so that the resulting data stream is of type `Either`.
+
+{% highlight scala %}
+val patternStream: PatternStream[Event] = CEP.pattern(input, pattern)
+
+DataStream[Either[TimeoutEvent, ComplexEvent]] result = patternStream.select{
+    (pattern: mutable.Map[String, Event], timestamp: Long) => TimeoutEvent()
+} {
+    pattern: mutable.Map[String, Event] => ComplexEvent()
+}
+{% endhighlight %}
+
+The `flatSelect` API call offers the same overloaded version which takes as the first parameter a timeout function and as second parameter a selection function.
+In contrast to the `select` functions, the `flatSelect` functions are called with an `Collector`.
+The collector can be used to emit an arbitrary number of events.
+
+{% highlight scala %}
+val patternStream: PatternStream[Event] = CEP.pattern(input, pattern)
+
+DataStream[Either[TimeoutEvent, ComplexEvent]] result = patternStream.flatSelect{
+    (pattern: mutable.Map[String, Event], timestamp: Long, out: Collector[TimeoutEvent]) => 
+        out.collect(TimeoutEvent())
+} {
+    (pattern: mutable.Map[String, Event], out: Collector[ComplexEvent]) => 
+        out.collect(ComplexEvent())
+}
+{% endhighlight %}
+
+</div>
+</div>
+
 ## Examples
 
 The following example detects the pattern `start, middle(name = "error") -> end(name = "critical")` on a keyed data stream of `Events`.

http://git-wip-us.apache.org/repos/asf/flink/blob/57ef6c31/flink-java8/src/test/java/org/apache/flink/cep/CEPLambdaTest.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/test/java/org/apache/flink/cep/CEPLambdaTest.java b/flink-java8/src/test/java/org/apache/flink/cep/CEPLambdaTest.java
index 2e6fcd9..5957158 100644
--- a/flink-java8/src/test/java/org/apache/flink/cep/CEPLambdaTest.java
+++ b/flink-java8/src/test/java/org/apache/flink/cep/CEPLambdaTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.cep;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.cep.pattern.Pattern;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.transformations.SourceTransformation;
@@ -44,18 +45,18 @@ public class CEPLambdaTest extends TestLogger {
 		TypeInformation<EventA> eventTypeInformation = TypeExtractor.getForClass(EventA.class);
 		TypeInformation<EventB> outputTypeInformation = TypeExtractor.getForClass(EventB.class);
 
-		TypeInformation<Map<String, EventA>> inputTpeInformation = (TypeInformation<Map<String, EventA>>) (TypeInformation<?>) TypeInformation.of(Map.class);
 
-		DataStream<Map<String, EventA>> inputStream = new DataStream<>(
+		DataStream<EventA> inputStream = new DataStream<>(
 			StreamExecutionEnvironment.getExecutionEnvironment(),
 			new SourceTransformation<>(
 				"source",
 				null,
-				inputTpeInformation,
+				eventTypeInformation,
 				1));
 
+		Pattern<EventA, ?> dummyPattern = Pattern.begin("start");
 
-		PatternStream<EventA> patternStream = new PatternStream<>(inputStream, eventTypeInformation);
+		PatternStream<EventA> patternStream = new PatternStream<>(inputStream, dummyPattern);
 
 		DataStream<EventB> result = patternStream.select(
 			map -> new EventB()
@@ -72,17 +73,17 @@ public class CEPLambdaTest extends TestLogger {
 		TypeInformation<EventA> eventTypeInformation = TypeExtractor.getForClass(EventA.class);
 		TypeInformation<EventB> outputTypeInformation = TypeExtractor.getForClass(EventB.class);
 
-		TypeInformation<Map<String, EventA>> inputTpeInformation = (TypeInformation<Map<String, EventA>>) (TypeInformation<?>) TypeInformation.of(Map.class);
-
-		DataStream<Map<String, EventA>> inputStream = new DataStream<>(
+		DataStream<EventA> inputStream = new DataStream<>(
 			StreamExecutionEnvironment.getExecutionEnvironment(),
 			new SourceTransformation<>(
 				"source",
 				null,
-				inputTpeInformation,
+				eventTypeInformation,
 				1));
 
-		PatternStream<EventA> patternStream = new PatternStream<>(inputStream, eventTypeInformation);
+		Pattern<EventA, ?> dummyPattern = Pattern.begin("start");
+
+		PatternStream<EventA> patternStream = new PatternStream<>(inputStream, dummyPattern);
 
 		DataStream<EventB> result = patternStream.flatSelect(
 			(map, collector) -> collector.collect(new EventB())

http://git-wip-us.apache.org/repos/asf/flink/blob/57ef6c31/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala
index 22b105c..6207049 100644
--- a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala
+++ b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala
@@ -18,12 +18,19 @@
 package org.apache.flink.cep.scala
 
 import java.util.{Map => JMap}
+
 import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.cep.{PatternFlatSelectFunction, PatternSelectFunction,
-PatternStream => JPatternStream}
-import org.apache.flink.streaming.api.scala.DataStream
-import org.apache.flink.streaming.api.scala.asScalaStream
+import org.apache.flink.cep.{PatternFlatSelectFunction, PatternFlatTimeoutFunction, PatternSelectFunction, PatternTimeoutFunction, PatternStream => JPatternStream}
+import org.apache.flink.cep.pattern.{Pattern => JPattern}
+import org.apache.flink.streaming.api.scala._
 import org.apache.flink.util.Collector
+import org.apache.flink.types.{Either => FEither}
+import org.apache.flink.api.java.tuple.{Tuple2 => FTuple2}
+import java.lang.{Long => JLong}
+
+import org.apache.flink.cep.operator.CEPOperatorUtils
+import org.apache.flink.cep.scala.pattern.Pattern
+
 import scala.collection.JavaConverters._
 import scala.collection.mutable
 
@@ -40,6 +47,10 @@ class PatternStream[T](jPatternStream: JPatternStream[T]) {
 
   private[flink] def wrappedPatternStream = jPatternStream
 
+  def getPattern: Pattern[T, T] = Pattern(jPatternStream.getPattern.asInstanceOf[JPattern[T, T]])
+
+  def getInputStream: DataStream[T] = asScalaStream(jPatternStream.getInputStream())
+
   /**
     * Applies a select function to the detected pattern sequence. For each pattern sequence the
     * provided [[PatternSelectFunction]] is called. The pattern select function can produce
@@ -56,6 +67,55 @@ class PatternStream[T](jPatternStream: JPatternStream[T]) {
   }
 
   /**
+    * Applies a select function to the detected pattern sequence. For each pattern sequence the
+    * provided [[PatternSelectFunction]] is called. The pattern select function can produce
+    * exactly one resulting element.
+    *
+    * Additionally a timeout function is applied to partial event patterns which have timed out. For
+    * each partial pattern sequence the provided [[PatternTimeoutFunction]] is called. The pattern
+    * timeout function has to produce exactly one resulting timeout event.
+    *
+    * The resulting event and the resulting timeout event are wrapped in an [[Either]] instance.
+    *
+    * @param patternTimeoutFunction The pattern timeout function which is called for each partial
+    *                               pattern sequence which has timed out.
+    * @param patternSelectFunction  The pattern select function which is called for each detected
+    *                               pattern sequence.
+    * @tparam L Type of the resulting timeout event
+    * @tparam R Type of the resulting event
+    * @return Data stream of either type which contains the resulting events and resulting timeout
+    *         events.
+    */
+  def select[L: TypeInformation, R: TypeInformation](
+    patternTimeoutFunction: PatternTimeoutFunction[T, L],
+    patternSelectFunction: PatternSelectFunction[T, R])
+  : DataStream[Either[L, R]] = {
+
+    val patternStream = CEPOperatorUtils.createTimeoutPatternStream(
+      jPatternStream.getInputStream(),
+      jPatternStream.getPattern())
+
+    val cleanedSelect = cleanClosure(patternSelectFunction)
+    val cleanedTimeout = cleanClosure(patternTimeoutFunction)
+
+    implicit val eitherTypeInfo = createTypeInformation[Either[L, R]]
+
+    asScalaStream(patternStream).map[Either[L, R]] {
+     input: FEither[FTuple2[JMap[String, T], JLong], JMap[String, T]] =>
+       if (input.isLeft) {
+         val timeout = input.left()
+         val timeoutEvent = cleanedTimeout.timeout(timeout.f0, timeout.f1)
+         val t = Left[L, R](timeoutEvent)
+         t
+       } else {
+         val event = cleanedSelect.select(input.right())
+         val t = Right[L, R](event)
+         t
+       }
+    }
+  }
+
+  /**
     * Applies a flat select function to the detected pattern sequence. For each pattern sequence
     * the provided [[PatternFlatSelectFunction]] is called. The pattern flat select function can
     * produce an arbitrary number of resulting elements.
@@ -69,7 +129,63 @@ class PatternStream[T](jPatternStream: JPatternStream[T]) {
   def flatSelect[R: TypeInformation](patternFlatSelectFunction: PatternFlatSelectFunction[T, R])
   : DataStream[R] = {
     asScalaStream(jPatternStream
-      .flatSelect(patternFlatSelectFunction, implicitly[TypeInformation[R]]))
+                    .flatSelect(patternFlatSelectFunction, implicitly[TypeInformation[R]]))
+  }
+
+  /**
+    * Applies a flat select function to the detected pattern sequence. For each pattern sequence
+    * the provided [[PatternFlatSelectFunction]] is called. The pattern flat select function can
+    * produce an arbitrary number of resulting elements.
+    *
+    * Additionally a timeout function is applied to partial event patterns which have timed out. For
+    * each partial pattern sequence the provided [[PatternFlatTimeoutFunction]] is called. The
+    * pattern timeout function can produce an arbitrary number of resulting timeout events.
+    *
+    * The resulting event and the resulting timeout event are wrapped in an [[Either]] instance.
+    *
+    * @param patternFlatTimeoutFunction The pattern flat timeout function which is called for each
+    *                                   partially matched pattern sequence which has timed out.
+    * @param patternFlatSelectFunction  The pattern flat select function which is called for each
+    *                                   detected pattern sequence.
+    * @tparam L Type of the resulting timeout event
+    * @tparam R Type of the resulting event
+    * @return Data stream of either type which contains the resulting events and the resulting
+    *         timeout events wrapped in a [[Either]] type.
+    */
+  def flatSelect[L: TypeInformation, R: TypeInformation](
+    patternFlatTimeoutFunction: PatternFlatTimeoutFunction[T, L],
+    patternFlatSelectFunction: PatternFlatSelectFunction[T, R])
+  : DataStream[Either[L, R]] = {
+    val patternStream = CEPOperatorUtils.createTimeoutPatternStream(
+      jPatternStream.getInputStream(),
+      jPatternStream.getPattern()
+    )
+
+    val cleanedSelect = cleanClosure(patternFlatSelectFunction)
+    val cleanedTimeout = cleanClosure(patternFlatTimeoutFunction)
+
+    implicit val eitherTypeInfo = createTypeInformation[Either[L, R]]
+
+    asScalaStream(patternStream).flatMap[Either[L, R]] {
+      (input: FEither[FTuple2[JMap[String, T], JLong], JMap[String, T]],
+        collector: Collector[Either[L, R]]) =>
+
+        if (input.isLeft()) {
+          val timeout = input.left()
+
+          cleanedTimeout.timeout(timeout.f0, timeout.f1, new Collector[L]() {
+            override def collect(record: L): Unit = collector.collect(Left(record))
+
+            override def close(): Unit = collector.close()
+          })
+        } else {
+          cleanedSelect.flatSelect(input.right, new Collector[R]() {
+            override def collect(record: R): Unit = collector.collect(Right(record))
+
+            override def close(): Unit = collector.close()
+          })
+        }
+    }
   }
 
   /**
@@ -83,8 +199,9 @@ class PatternStream[T](jPatternStream: JPatternStream[T]) {
     * @return [[DataStream]] which contains the resulting elements from the pattern select function.
     */
   def select[R: TypeInformation](patternSelectFun: mutable.Map[String, T] => R): DataStream[R] = {
+    val cleanFun = cleanClosure(patternSelectFun)
+
     val patternSelectFunction: PatternSelectFunction[T, R] = new PatternSelectFunction[T, R] {
-      val cleanFun = cleanClosure(patternSelectFun)
 
       def select(in: JMap[String, T]): R = cleanFun(in.asScala)
     }
@@ -92,6 +209,46 @@ class PatternStream[T](jPatternStream: JPatternStream[T]) {
   }
 
   /**
+    * Applies a select function to the detected pattern sequence. For each pattern sequence the
+    * provided [[PatternSelectFunction]] is called. The pattern select function can produce
+    * exactly one resulting element.
+    *
+    * Additionally a timeout function is applied to partial event patterns which have timed out. For
+    * each partial pattern sequence the provided [[PatternTimeoutFunction]] is called. The pattern
+    * timeout function has to produce exactly one resulting element.
+    *
+    * The resulting event and the resulting timeout event are wrapped in an [[Either]] instance.
+    *
+    * @param patternTimeoutFunction The pattern timeout function which is called for each partial
+    *                               pattern sequence which has timed out.
+    * @param patternSelectFunction  The pattern select function which is called for each detected
+    *                               pattern sequence.
+    * @tparam L Type of the resulting timeout event
+    * @tparam R Type of the resulting event
+    * @return Data stream of either type which contain the resulting events and resulting timeout
+    *         events.
+    */
+  def select[L: TypeInformation, R: TypeInformation](
+      patternTimeoutFunction: (mutable.Map[String, T], Long) => L) (
+      patternSelectFunction: mutable.Map[String, T] => R)
+    : DataStream[Either[L, R]] = {
+
+    val cleanSelectFun = cleanClosure(patternSelectFunction)
+    val cleanTimeoutFun = cleanClosure(patternTimeoutFunction)
+
+    val patternSelectFun = new PatternSelectFunction[T, R] {
+      override def select(pattern: JMap[String, T]): R = cleanSelectFun(pattern.asScala)
+    }
+    val patternTimeoutFun = new PatternTimeoutFunction[T, L] {
+      override def timeout(pattern: JMap[String, T], timeoutTimestamp: Long): L = {
+        cleanTimeoutFun(pattern.asScala, timeoutTimestamp)
+      }
+    }
+
+    select(patternTimeoutFun, patternSelectFun)
+  }
+
+  /**
     * Applies a flat select function to the detected pattern sequence. For each pattern sequence
     * the provided [[PatternFlatSelectFunction]] is called. The pattern flat select function
     * can produce an arbitrary number of resulting elements.
@@ -104,9 +261,10 @@ class PatternStream[T](jPatternStream: JPatternStream[T]) {
     */
   def flatSelect[R: TypeInformation](patternFlatSelectFun: (mutable.Map[String, T],
     Collector[R]) => Unit): DataStream[R] = {
+    val cleanFun = cleanClosure(patternFlatSelectFun)
+
     val patternFlatSelectFunction: PatternFlatSelectFunction[T, R] =
       new PatternFlatSelectFunction[T, R] {
-        val cleanFun = cleanClosure(patternFlatSelectFun)
 
         def flatSelect(pattern: JMap[String, T], out: Collector[R]): Unit =
           cleanFun(pattern.asScala, out)
@@ -114,6 +272,51 @@ class PatternStream[T](jPatternStream: JPatternStream[T]) {
     flatSelect(patternFlatSelectFunction)
   }
 
+  /**
+    * Applies a flat select function to the detected pattern sequence. For each pattern sequence
+    * the provided [[PatternFlatSelectFunction]] is called. The pattern flat select function can
+    * produce an arbitrary number of resulting elements.
+    *
+    * Additionally a timeout function is applied to partial event patterns which have timed out. For
+    * each partial pattern sequence the provided [[PatternFlatTimeoutFunction]] is called. The
+    * pattern timeout function can produce an arbitrary number of resulting timeout events.
+    *
+    * The resulting event and the resulting timeout event are wrapped in an [[Either]] instance.
+    *
+    * @param patternFlatTimeoutFunction The pattern flat timeout function which is called for each
+    *                                   partially matched pattern sequence which has timed out.
+    * @param patternFlatSelectFunction  The pattern flat select function which is called for each
+    *                                   detected pattern sequence.
+    * @tparam L Type of the resulting timeout event
+    * @tparam R Type of the resulting event
+    * @return Data stream of either type which contains the resulting events and the resulting
+    *         timeout events wrapped in a [[Either]] type.
+    */
+  def flatSelect[L: TypeInformation, R: TypeInformation](
+      patternFlatTimeoutFunction: (mutable.Map[String, T], Long, Collector[L]) => Unit) (
+      patternFlatSelectFunction: (mutable.Map[String, T], Collector[R]) => Unit)
+    : DataStream[Either[L, R]] = {
+
+    val cleanSelectFun = cleanClosure(patternFlatSelectFunction)
+    val cleanTimeoutFun = cleanClosure(patternFlatTimeoutFunction)
+
+    val patternFlatSelectFun = new PatternFlatSelectFunction[T, R] {
+      override def flatSelect(pattern: JMap[String, T], out: Collector[R]): Unit = {
+        cleanSelectFun(pattern.asScala, out)
+      }
+    }
+
+    val patternFlatTimeoutFun = new PatternFlatTimeoutFunction[T, L] {
+      override def timeout(
+        pattern: JMap[String, T],
+        timeoutTimestamp: Long, out: Collector[L])
+      : Unit = {
+        cleanTimeoutFun(pattern.asScala, timeoutTimestamp, out)
+      }
+    }
+
+    flatSelect(patternFlatTimeoutFun, patternFlatSelectFun)
+  }
 }
 
 object PatternStream {

http://git-wip-us.apache.org/repos/asf/flink/blob/57ef6c31/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/PatternStreamScalaJavaAPIInteroperabilityTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/PatternStreamScalaJavaAPIInteroperabilityTest.scala b/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/PatternStreamScalaJavaAPIInteroperabilityTest.scala
new file mode 100644
index 0000000..6fe68c8
--- /dev/null
+++ b/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/PatternStreamScalaJavaAPIInteroperabilityTest.scala
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.cep.scala
+
+import org.apache.flink.api.common.functions.util.ListCollector
+import org.apache.flink.cep.scala.pattern.Pattern
+import org.apache.flink.streaming.api.operators.{StreamFlatMap, StreamMap}
+import org.apache.flink.streaming.api.scala._
+import org.apache.flink.streaming.api.transformations.OneInputTransformation
+import org.apache.flink.util.{Collector, TestLogger}
+import org.apache.flink.types.{Either => FEither}
+import org.apache.flink.api.java.tuple.{Tuple2 => FTuple2}
+
+import java.lang.{Long => JLong}
+import java.util.{Map => JMap}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import org.junit.Assert._
+import org.junit.Test
+
+class PatternStreamScalaJavaAPIInteroperabilityTest extends TestLogger {
+
+  @Test
+  @throws[Exception]
+  def testScalaJavaAPISelectFunForwarding {
+    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
+    val dummyDataStream: DataStream[(Int, Int)] = env.fromElements()
+    val pattern: Pattern[(Int, Int), _] = Pattern.begin[(Int, Int)]("dummy")
+    val pStream: PatternStream[(Int, Int)] = CEP.pattern(dummyDataStream, pattern)
+    val param = mutable.Map("begin" ->(1, 2)).asJava
+    val result: DataStream[(Int, Int)] = pStream
+      .select((pattern: mutable.Map[String, (Int, Int)]) => {
+        //verifies input parameter forwarding
+        assertEquals(param, pattern.asJava)
+        param.get("begin")
+      })
+    val out = extractUserFunction[StreamMap[java.util.Map[String, (Int, Int)], (Int, Int)]](result)
+      .getUserFunction.map(param)
+    //verifies output parameter forwarding
+    assertEquals(param.get("begin"), out)
+  }
+
+  @Test
+  @throws[Exception]
+  def testScalaJavaAPIFlatSelectFunForwarding {
+    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
+    val dummyDataStream: DataStream[List[Int]] = env.fromElements()
+    val pattern: Pattern[List[Int], _] = Pattern.begin[List[Int]]("dummy")
+    val pStream: PatternStream[List[Int]] = CEP.pattern(dummyDataStream, pattern)
+    val inList = List(1, 2, 3)
+    val inParam = mutable.Map("begin" -> inList).asJava
+    val outList = new java.util.ArrayList[List[Int]]
+    val outParam = new ListCollector[List[Int]](outList)
+
+    val result: DataStream[List[Int]] = pStream
+
+      .flatSelect((pattern: mutable.Map[String, List[Int]], out: Collector[List[Int]]) => {
+        //verifies input parameter forwarding
+        assertEquals(inParam, pattern.asJava)
+        out.collect(pattern.get("begin").get)
+      })
+
+    extractUserFunction[StreamFlatMap[java.util.Map[String, List[Int]], List[Int]]](result).
+      getUserFunction.flatMap(inParam, outParam)
+    //verify output parameter forwarding and that flatMap function was actually called
+    assertEquals(inList, outList.get(0))
+  }
+
+  @Test
+  @throws[Exception]
+  def testTimeoutHandling: Unit = {
+    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
+    val dummyDataStream: DataStream[String] = env.fromElements()
+    val pattern: Pattern[String, _] = Pattern.begin[String]("dummy")
+    val pStream: PatternStream[String] = CEP.pattern(dummyDataStream, pattern)
+    val inParam = mutable.Map("begin" -> "barfoo").asJava
+    val outList = new java.util.ArrayList[Either[String, String]]
+    val output = new ListCollector[Either[String, String]](outList)
+    val expectedOutput = List(Right("match"), Right("barfoo"), Left("timeout"), Left("barfoo"))
+      .asJava
+
+    val result: DataStream[Either[String, String]] = pStream.flatSelect {
+        (pattern: mutable.Map[String, String], timestamp: Long, out: Collector[String]) =>
+          out.collect("timeout")
+          out.collect(pattern("begin"))
+      } {
+        (pattern: mutable.Map[String, String], out: Collector[String]) =>
+          //verifies input parameter forwarding
+          assertEquals(inParam, pattern.asJava)
+          out.collect("match")
+          out.collect(pattern("begin"))
+      }
+
+    val fun = extractUserFunction[
+      StreamFlatMap[
+        FEither[
+          FTuple2[JMap[String, String], JLong],
+          JMap[String, String]],
+        Either[String, String]]](result)
+
+    fun.getUserFunction.flatMap(FEither.Right(inParam), output)
+    fun.getUserFunction.flatMap(FEither.Left(FTuple2.of(inParam, 42L)), output)
+
+    assertEquals(expectedOutput, outList)
+  }
+
+  def extractUserFunction[T](dataStream: DataStream[_]) = {
+    dataStream.javaStream
+      .getTransformation
+      .asInstanceOf[OneInputTransformation[_, _]]
+      .getOperator
+      .asInstanceOf[T]
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/57ef6c31/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/PatternStreamScalaJavaAPIInteroperabiliyTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/PatternStreamScalaJavaAPIInteroperabiliyTest.scala b/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/PatternStreamScalaJavaAPIInteroperabiliyTest.scala
deleted file mode 100644
index 7daebfe..0000000
--- a/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/PatternStreamScalaJavaAPIInteroperabiliyTest.scala
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.cep.scala
-
-import org.apache.flink.api.common.functions.util.ListCollector
-import org.apache.flink.cep.scala.pattern.Pattern
-import org.apache.flink.streaming.api.operators.{StreamFlatMap, StreamMap}
-import org.apache.flink.streaming.api.scala._
-import org.apache.flink.streaming.api.transformations.OneInputTransformation
-import org.apache.flink.util.{Collector, TestLogger}
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable
-import org.junit.Assert._
-import org.junit.Test
-
-class PatternStreamScalaJavaAPIInteroperabiliyTest extends TestLogger {
-
-  @Test
-  @throws[Exception]
-  def testScalaJavaAPISelectFunForwarding {
-    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
-    val dummyDataStream: DataStream[(Int, Int)] = env.fromElements()
-    val pattern: Pattern[(Int, Int), _] = Pattern.begin[(Int, Int)]("dummy")
-    val pStream: PatternStream[(Int, Int)] = CEP.pattern(dummyDataStream, pattern)
-    val param = mutable.Map("begin" ->(1, 2)).asJava
-    val result: DataStream[(Int, Int)] = pStream
-      .select((pattern: mutable.Map[String, (Int, Int)]) => {
-        //verifies input parameter forwarding
-        assertEquals(param, pattern.asJava)
-        param.get("begin")
-      })
-    val out = extractUserFunction[StreamMap[java.util.Map[String, (Int, Int)], (Int, Int)]](result)
-      .getUserFunction.map(param)
-    //verifies output parameter forwarding
-    assertEquals(param.get("begin"), out)
-  }
-
-  @Test
-  @throws[Exception]
-  def testScalaJavaAPIFlatSelectFunForwarding {
-    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
-    val dummyDataStream: DataStream[List[Int]] = env.fromElements()
-    val pattern: Pattern[List[Int], _] = Pattern.begin[List[Int]]("dummy")
-    val pStream: PatternStream[List[Int]] = CEP.pattern(dummyDataStream, pattern)
-    val inList = List(1, 2, 3)
-    val inParam = mutable.Map("begin" -> inList).asJava
-    val outList = new java.util.ArrayList[List[Int]]
-    val outParam = new ListCollector[List[Int]](outList)
-
-    val result: DataStream[List[Int]] = pStream
-
-      .flatSelect((pattern: mutable.Map[String, List[Int]], out: Collector[List[Int]]) => {
-        //verifies input parameter forwarding
-        assertEquals(inParam, pattern.asJava)
-        out.collect(pattern.get("begin").get)
-      })
-
-    extractUserFunction[StreamFlatMap[java.util.Map[String, List[Int]], List[Int]]](result).
-      getUserFunction.flatMap(inParam, outParam)
-    //verify output parameter forwarding and that flatMap function was actually called
-    assertEquals(inList, outList.get(0))
-  }
-
-  def extractUserFunction[T](dataStream: DataStream[_]) = {
-    dataStream.javaStream
-      .getTransformation
-      .asInstanceOf[OneInputTransformation[_, _]]
-      .getOperator
-      .asInstanceOf[T]
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/57ef6c31/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/CEP.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/CEP.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/CEP.java
index 60e0bf8..9ce9f77 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/CEP.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/CEP.java
@@ -18,20 +18,8 @@
 
 package org.apache.flink.cep;
 
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.cep.nfa.State;
-import org.apache.flink.cep.nfa.compiler.NFACompiler;
-import org.apache.flink.cep.operator.CEPPatternOperator;
-import org.apache.flink.cep.operator.KeyedCEPPatternOperator;
 import org.apache.flink.cep.pattern.Pattern;
-import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.KeyedStream;
-
-import java.util.Map;
 
 /**
  * Utility class for complex event processing.
@@ -39,62 +27,15 @@ import java.util.Map;
  * Methods which transform a {@link DataStream} into a {@link PatternStream} to do CEP.
  */
 public class CEP {
-	private static final String PATTERN_OPERATOR_NAME = "AbstractCEPPatternOperator";
-
 	/**
-	 * Transforms a {@link DataStream<T>} into a {@link PatternStream<T>}. The PatternStream detects
-	 * the provided event pattern and emits the patterns as a {@link Map<String, T>} where each event
-	 * is identified by a String. The String is the name of the {@link State <T>} to which the event
-	 * has been associated.
-	 *
-	 * Depending on the input {@link DataStream<T>} type, keyed vs. non-keyed, a different
-	 * {@link org.apache.flink.cep.operator.AbstractCEPPatternOperator<T>} is instantiated.
+	 * Creates a {@link PatternStream} from an input data stream and a pattern.
 	 *
 	 * @param input DataStream containing the input events
 	 * @param pattern Pattern specification which shall be detected
 	 * @param <T> Type of the input events
-	 * @param <K> Type of the key in case of a KeyedStream (necessary to bind keySelector and
-	 *            keySerializer to the same type)
 	 * @return Resulting pattern stream
 	 */
-	public static <T, K> PatternStream<T> pattern(DataStream<T> input, Pattern<T, ?> pattern) {
-		final TypeSerializer<T> inputSerializer = input.getType().createSerializer(input.getExecutionConfig());
-
-		// check whether we use processing time
-		final boolean isProcessingTime = input.getExecutionEnvironment().getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime;
-
-		// compile our pattern into a NFAFactory to instantiate NFAs later on
-		final NFACompiler.NFAFactory<T> nfaFactory = NFACompiler.compileFactory(pattern, inputSerializer);
-
-		final DataStream<Map<String, T>> patternStream;
-
-		if (input instanceof KeyedStream) {
-			// We have to use the KeyedCEPPatternOperator which can deal with keyed input streams
-			KeyedStream<T, K> keyedStream= (KeyedStream<T, K>) input;
-
-			KeySelector<T, K> keySelector = keyedStream.getKeySelector();
-			TypeSerializer<K> keySerializer = keyedStream.getKeyType().createSerializer(keyedStream.getExecutionConfig());
-
-			patternStream = keyedStream.transform(
-				PATTERN_OPERATOR_NAME,
-				(TypeInformation<Map<String, T>>) (TypeInformation<?>) TypeExtractor.getForClass(Map.class),
-				new KeyedCEPPatternOperator<>(
-					inputSerializer,
-					isProcessingTime,
-					keySelector,
-					keySerializer,
-					nfaFactory));
-		} else {
-			patternStream = input.transform(
-				PATTERN_OPERATOR_NAME,
-				(TypeInformation<Map<String, T>>) (TypeInformation<?>) TypeExtractor.getForClass(Map.class),
-				new CEPPatternOperator<T>(
-					inputSerializer,
-					isProcessingTime,
-					nfaFactory
-				)).setParallelism(1);
-		}
-
-		return new PatternStream<>(patternStream, input.getType());
+	public static <T> PatternStream<T> pattern(DataStream<T> input, Pattern<T, ?> pattern) {
+		return new PatternStream<>(input, pattern);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/57ef6c31/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternFlatTimeoutFunction.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternFlatTimeoutFunction.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternFlatTimeoutFunction.java
new file mode 100644
index 0000000..661d32a
--- /dev/null
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternFlatTimeoutFunction.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.util.Collector;
+
+import java.io.Serializable;
+import java.util.Map;
+
+/**
+ * Base interface for a pattern timeout function which can produce multiple resulting elements. A
+ * pattern flat timeout function is called with a map of partial events which are identified by
+ * their names and the timestamp when the timeout occurred. The names are defined by the
+ * {@link org.apache.flink.cep.pattern.Pattern} specifying the sought-after pattern.
+ * Additionally, a collector is provided as a parameter. The collector is used to emit an arbitrary
+ * number of resulting elements.
+ *
+ * <pre>{@code
+ * PatternStream<IN> pattern = ...
+ *
+ * DataStream<OUT> result = pattern.flatSelect(..., new MyPatternFlatTimeoutFunction());
+ * }</pre>
+ * @param <IN>
+ * @param <OUT>
+ */
+public interface PatternFlatTimeoutFunction<IN, OUT> extends Function, Serializable {
+
+	/**
+	 * Generates zero or more resulting timeout elements given a map of partial pattern events and
+	 * the timestamp of the timeout. The events are identified by their specified names.
+	 *
+	 * @param pattern Map containing the partial pattern. Events are identified by their names.
+	 * @param timeoutTimestamp Timestamp when the timeout occurred
+	 * @param out Collector used to output the generated elements
+	 * @throws Exception This method may throw exceptions. Throwing an exception will cause the
+	 * 					 operation to fail and may trigger recovery.
+	 */
+	void timeout(Map<String, IN> pattern, long timeoutTimestamp, Collector<OUT> out) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/57ef6c31/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java
index 57c5a9b..efcd16c 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java
@@ -21,8 +21,13 @@ package org.apache.flink.cep;
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.EitherTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.cep.operator.CEPOperatorUtils;
+import org.apache.flink.cep.pattern.Pattern;
 import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.types.Either;
 import org.apache.flink.util.Collector;
 
 import java.util.Map;
@@ -33,18 +38,29 @@ import java.util.Map;
  * {@link org.apache.flink.cep.nfa.NFA}. In order to process the detected sequences, the user
  * has to specify a {@link PatternSelectFunction} or a {@link PatternFlatSelectFunction}.
  *
+ * Additionally it allows to handle partially matched event patterns which have timed out. For this
+ * the user has to specify a {@link PatternTimeoutFunction} or a {@link PatternFlatTimeoutFunction}.
+ *
  * @param <T> Type of the events
  */
 public class PatternStream<T> {
 
 	// underlying data stream
-	private final DataStream<Map<String, T>> patternStream;
-	// type information of input type T
-	private final TypeInformation<T> inputType;
+	private final DataStream<T> inputStream;
+
+	private final Pattern<T, ?> pattern;
+
+	PatternStream(final DataStream<T> inputStream, final Pattern<T, ?> pattern) {
+		this.inputStream = inputStream;
+		this.pattern = pattern;
+	}
 
-	PatternStream(final DataStream<Map<String, T>> patternStream, final TypeInformation<T> inputType) {
-		this.patternStream = patternStream;
-		this.inputType = inputType;
+	public Pattern<T, ?> getPattern() {
+		return pattern;
+	}
+
+	public DataStream<T> getInputStream() {
+		return inputStream;
 	}
 
 	/**
@@ -67,7 +83,7 @@ public class PatternStream<T> {
 			PatternSelectFunction.class,
 			1,
 			-1,
-			inputType,
+			inputStream.getType(),
 			null,
 			false);
 
@@ -87,6 +103,8 @@ public class PatternStream<T> {
 	 *         function.
 	 */
 	public <R> DataStream<R> select(final PatternSelectFunction<T, R> patternSelectFunction, TypeInformation<R> outTypeInfo) {
+		DataStream<Map<String, T>> patternStream = CEPOperatorUtils.createPatternStream(inputStream, pattern);
+
 		return patternStream.map(
 			new PatternSelectMapper<>(
 				patternStream.getExecutionEnvironment().clean(patternSelectFunction)))
@@ -94,13 +112,65 @@ public class PatternStream<T> {
 	}
 
 	/**
+	 * Applies a select function to the detected pattern sequence. For each pattern sequence the
+	 * provided {@link PatternSelectFunction} is called. The pattern select function can produce
+	 * exactly one resulting element.
+	 *
+	 * Applies a timeout function to a partial pattern sequence which has timed out. For each
+	 * partial pattern sequence the provided {@link PatternTimeoutFunction} is called. The pattern
+	 * timeout function can produce exactly one resulting element.
+	 *
+	 * @param patternTimeoutFunction The pattern timeout function which is called for each partial
+	 *                               pattern sequence which has timed out.
+	 * @param patternSelectFunction The pattern select function which is called for each detected
+	 *                              pattern sequence.
+	 * @param <L> Type of the resulting timeout elements
+	 * @param <R> Type of the resulting elements
+	 * @return {@link DataStream} which contains the resulting elements or the resulting timeout
+	 * elements wrapped in an {@link Either} type.
+	 */
+	public <L, R> DataStream<Either<L, R>> select(
+		final PatternTimeoutFunction<T, L> patternTimeoutFunction,
+		final PatternSelectFunction<T, R> patternSelectFunction) {
+
+		DataStream<Either<Tuple2<Map<String, T>, Long>, Map<String, T>>> patternStream = CEPOperatorUtils.createTimeoutPatternStream(inputStream, pattern);
+
+		TypeInformation<L> leftTypeInfo = TypeExtractor.getUnaryOperatorReturnType(
+			patternTimeoutFunction,
+			PatternTimeoutFunction.class,
+			1,
+			-1,
+			inputStream.getType(),
+			null,
+			false);
+
+		TypeInformation<R> rightTypeInfo = TypeExtractor.getUnaryOperatorReturnType(
+			patternSelectFunction,
+			PatternSelectFunction.class,
+			1,
+			-1,
+			inputStream.getType(),
+			null,
+			false);
+
+		TypeInformation<Either<L, R>> outTypeInfo = new EitherTypeInfo<>(leftTypeInfo, rightTypeInfo);
+
+		return patternStream.map(
+			new PatternSelectTimeoutMapper<>(
+				patternStream.getExecutionEnvironment().clean(patternSelectFunction),
+				patternStream.getExecutionEnvironment().clean(patternTimeoutFunction)
+			)
+		).returns(outTypeInfo);
+	}
+
+	/**
 	 * Applies a flat select function to the detected pattern sequence. For each pattern sequence
 	 * the provided {@link PatternFlatSelectFunction} is called. The pattern flat select function
 	 * can produce an arbitrary number of resulting elements.
 	 *
 	 * @param patternFlatSelectFunction The pattern flat select function which is called for each
 	 *                                  detected pattern sequence.
-	 * @param <R> Typ of the resulting elements
+	 * @param <R> Type of the resulting elements
 	 * @return {@link DataStream} which contains the resulting elements from the pattern flat select
 	 *         function.
 	 */
@@ -112,7 +182,7 @@ public class PatternStream<T> {
 			PatternFlatSelectFunction.class,
 			1,
 			0,
-			inputType,
+			inputStream.getType(),
 			null,
 			false);
 
@@ -126,12 +196,14 @@ public class PatternStream<T> {
 	 *
 	 * @param patternFlatSelectFunction The pattern flat select function which is called for each
 	 *                                  detected pattern sequence.
-	 * @param <R> Typ of the resulting elements
+	 * @param <R> Type of the resulting elements
 	 * @param outTypeInfo Explicit specification of output type.
 	 * @return {@link DataStream} which contains the resulting elements from the pattern flat select
 	 *         function.
 	 */
 	public <R> DataStream<R> flatSelect(final PatternFlatSelectFunction<T, R> patternFlatSelectFunction, TypeInformation<R> outTypeInfo) {
+		DataStream<Map<String, T>> patternStream = CEPOperatorUtils.createPatternStream(inputStream, pattern);
+
 		return patternStream.flatMap(
 			new PatternFlatSelectMapper<>(
 				patternStream.getExecutionEnvironment().clean(patternFlatSelectFunction)
@@ -139,6 +211,59 @@ public class PatternStream<T> {
 	}
 
 	/**
+	 * Applies a flat select function to the detected pattern sequence. For each pattern sequence
+	 * the provided {@link PatternFlatSelectFunction} is called. The pattern flat select function
+	 * can produce an arbitrary number of resulting elements.
+	 *
+	 * Applies a timeout function to a partial pattern sequence which has timed out. For each
+	 * partial pattern sequence the provided {@link PatternFlatTimeoutFunction} is called. The
+	 * pattern timeout function can produce an arbitrary number of resulting elements.
+	 *
+	 * @param patternFlatTimeoutFunction The pattern flat timeout function which is called for each
+	 *                                   partial pattern sequence which has timed out.
+	 * @param patternFlatSelectFunction The pattern flat select function which is called for each
+	 *                                  detected pattern sequence.
+	 * @param <L> Type of the resulting timeout events
+	 * @param <R> Type of the resulting events
+	 * @return {@link DataStream} which contains the resulting events from the pattern flat select
+	 * function or the resulting timeout events from the pattern flat timeout function wrapped in an
+	 * {@link Either} type.
+	 */
+	public <L, R> DataStream<Either<L, R>> flatSelect(
+		final PatternFlatTimeoutFunction<T, L> patternFlatTimeoutFunction,
+		final PatternFlatSelectFunction<T, R> patternFlatSelectFunction) {
+
+		DataStream<Either<Tuple2<Map<String, T>, Long>, Map<String, T>>> patternStream = CEPOperatorUtils.createTimeoutPatternStream(inputStream, pattern);
+
+		TypeInformation<L> leftTypeInfo = TypeExtractor.getUnaryOperatorReturnType(
+			patternFlatTimeoutFunction,
+			PatternFlatTimeoutFunction.class,
+			1,
+			-1,
+			inputStream.getType(),
+			null,
+			false);
+
+		TypeInformation<R> rightTypeInfo = TypeExtractor.getUnaryOperatorReturnType(
+			patternFlatSelectFunction,
+			PatternFlatSelectFunction.class,
+			1,
+			-1,
+			inputStream.getType(),
+			null,
+			false);
+
+		TypeInformation<Either<L, R>> outTypeInfo = new EitherTypeInfo<>(leftTypeInfo, rightTypeInfo);
+
+		return patternStream.flatMap(
+			new PatternFlatSelectTimeoutWrapper<>(
+				patternStream.getExecutionEnvironment().clean(patternFlatSelectFunction),
+				patternStream.getExecutionEnvironment().clean(patternFlatTimeoutFunction)
+			)
+		).returns(outTypeInfo);
+	}
+
+	/**
 	 * Wrapper for a {@link PatternSelectFunction}.
 	 *
 	 * @param <T> Type of the input elements
@@ -159,6 +284,97 @@ public class PatternStream<T> {
 		}
 	}
 
+	private static class PatternSelectTimeoutMapper<T, L, R> implements MapFunction<Either<Tuple2<Map<String, T>, Long>, Map<String, T>>, Either<L, R>> {
+
+		private static final long serialVersionUID = 8259477556738887724L;
+
+		private final PatternSelectFunction<T, R> patternSelectFunction;
+		private final PatternTimeoutFunction<T, L> patternTimeoutFunction;
+
+		public PatternSelectTimeoutMapper(
+			PatternSelectFunction<T, R> patternSelectFunction,
+			PatternTimeoutFunction<T, L> patternTimeoutFunction) {
+
+			this.patternSelectFunction = patternSelectFunction;
+			this.patternTimeoutFunction = patternTimeoutFunction;
+		}
+
+		@Override
+		public Either<L, R> map(Either<Tuple2<Map<String, T>, Long>, Map<String, T>> value) throws Exception {
+			if (value.isLeft()) {
+				Tuple2<Map<String, T>, Long> timeout = value.left();
+
+				return Either.Left(patternTimeoutFunction.timeout(timeout.f0, timeout.f1));
+			} else {
+				return Either.Right(patternSelectFunction.select(value.right()));
+			}
+		}
+	}
+
+	private static class PatternFlatSelectTimeoutWrapper<T, L, R> implements FlatMapFunction<Either<Tuple2<Map<String, T>, Long>, Map<String, T>>, Either<L, R>> {
+
+		private static final long serialVersionUID = 7483674669662261667L;
+
+		private final PatternFlatSelectFunction<T, R> patternFlatSelectFunction;
+		private final PatternFlatTimeoutFunction<T, L> patternFlatTimeoutFunction;
+
+		public PatternFlatSelectTimeoutWrapper(
+			PatternFlatSelectFunction<T, R> patternFlatSelectFunction,
+			PatternFlatTimeoutFunction<T, L> patternFlatTimeoutFunction) {
+			this.patternFlatSelectFunction = patternFlatSelectFunction;
+			this.patternFlatTimeoutFunction = patternFlatTimeoutFunction;
+		}
+
+		@Override
+		public void flatMap(Either<Tuple2<Map<String, T>, Long>, Map<String, T>> value, Collector<Either<L, R>> out) throws Exception {
+			if (value.isLeft()) {
+				Tuple2<Map<String, T>, Long> timeout = value.left();
+
+				patternFlatTimeoutFunction.timeout(timeout.f0, timeout.f1, new LeftCollector<>(out));
+			} else {
+				patternFlatSelectFunction.flatSelect(value.right(), new RightCollector(out));
+			}
+		}
+
+		private static class LeftCollector<L, R> implements Collector<L> {
+
+			private final Collector<Either<L, R>> out;
+
+			private LeftCollector(Collector<Either<L, R>> out) {
+				this.out = out;
+			}
+
+			@Override
+			public void collect(L record) {
+				out.collect(Either.<L, R>Left(record));
+			}
+
+			@Override
+			public void close() {
+				out.close();
+			}
+		}
+
+		private static class RightCollector<L, R> implements Collector<R> {
+
+			private final Collector<Either<L, R>> out;
+
+			private RightCollector(Collector<Either<L, R>> out) {
+				this.out = out;
+			}
+
+			@Override
+			public void collect(R record) {
+				out.collect(Either.<L, R>Right(record));
+			}
+
+			@Override
+			public void close() {
+				out.close();
+			}
+		}
+	}
+
 	/**
 	 * Wrapper for a {@link PatternFlatSelectFunction}.
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/57ef6c31/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternTimeoutFunction.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternTimeoutFunction.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternTimeoutFunction.java
new file mode 100644
index 0000000..974d6df
--- /dev/null
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternTimeoutFunction.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep;
+
+import org.apache.flink.api.common.functions.Function;
+
+import java.io.Serializable;
+import java.util.Map;
+
+/**
+ * Base interface for a pattern timeout function. A pattern timeout function is called with a
+ * map containing the timed out partial events which can be accessed by their names and the
+ * timestamp when the timeout occurred. The names depend on the definition of the
+ * {@link org.apache.flink.cep.pattern.Pattern}. The timeout method returns exactly one result. If
+ * you want to return more than one result, then you have to implement a
+ * {@link PatternFlatTimeoutFunction}.
+ *
+ * <pre>{@code
+ * PatternStream<IN> pattern = ...;
+ *
+ * DataStream<OUT> result = pattern.select(..., new MyPatternTimeoutFunction());
+ *}</pre>
+ *
+ * @param <IN> Type of the input elements
+ * @param <OUT> Type of the output element
+ */
+public interface PatternTimeoutFunction<IN, OUT> extends Function, Serializable {
+
+	/**
+	 * Generates a timeout result from the given map of events and timeout timestamp. The partial
+	 * events are identified by their names. Only one resulting element can be generated.
+	 *
+	 * @param pattern Map containing the found partial pattern. Events are identified by their names
+	 * @param timeoutTimestamp Timestamp of the timeout
+	 * @return Resulting timeout element
+	 * @throws Exception This method may throw exceptions. Throwing an exception will cause the
+	 * 					 operation to fail and may trigger recovery.
+	 */
+	OUT timeout(Map<String, IN> pattern, long timeoutTimestamp) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/57ef6c31/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
index 5824264..f769a2b 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
@@ -20,6 +20,7 @@ package org.apache.flink.cep.nfa;
 
 import com.google.common.collect.LinkedHashMultimap;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.cep.NonDuplicatingTypeSerializer;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
@@ -34,7 +35,6 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedList;
-import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Queue;
@@ -71,15 +71,22 @@ public class NFA<T> implements Serializable {
 	// Length of the window
 	private final long windowTime;
 
+	private final boolean handleTimeout;
+
 	// Current starting index for the next dewey version number
 	private int startEventCounter;
 
 	// Current set of computation states within the state machine
 	private transient Queue<ComputationState<T>> computationStates;
 
-	public NFA(final TypeSerializer<T> eventSerializer, final long windowTime) {
+	public NFA(
+		final TypeSerializer<T> eventSerializer,
+		final long windowTime,
+		final boolean handleTimeout) {
+
 		this.nonDuplicatingTypeSerializer = new NonDuplicatingTypeSerializer<>(eventSerializer);
 		this.windowTime = windowTime;
+		this.handleTimeout = handleTimeout;
 		sharedBuffer = new SharedBuffer<>(nonDuplicatingTypeSerializer);
 		computationStates = new LinkedList<>();
 
@@ -107,16 +114,19 @@ public class NFA<T> implements Serializable {
 
 	/**
 	 * Processes the next input event. If some of the computations reach a final state then the
-	 * resulting event sequences are returned.
+	 * resulting event sequences are returned. If computations time out and timeout handling is
+	 * activated, then the timed out event patterns are returned.
 	 *
 	 * @param event The current event to be processed
 	 * @param timestamp The timestamp of the current event
-	 * @return The collection of matched patterns (e.g. the result of computations which have
-	 * reached a final state)
+	 * @return Tuple of the collection of matched patterns (e.g. the result of computations which have
+	 * reached a final state) and the collection of timed out patterns (if timeout handling is
+	 * activated)
 	 */
-	public Collection<Map<String, T>> process(final T event, final long timestamp) {
+	public Tuple2<Collection<Map<String, T>>, Collection<Tuple2<Map<String, T>, Long>>> process(final T event, final long timestamp) {
 		final int numberComputationStates = computationStates.size();
-		final List<Map<String, T>> result = new ArrayList<>();
+		final Collection<Map<String, T>> result = new ArrayList<>();
+		final Collection<Tuple2<Map<String, T>, Long>> timeoutResult = new ArrayList<>();
 
 		// iterate over all current computations
 		for (int i = 0; i < numberComputationStates; i++) {
@@ -127,6 +137,16 @@ public class NFA<T> implements Serializable {
 			if (!computationState.isStartState() &&
 				windowTime > 0 &&
 				timestamp - computationState.getStartTimestamp() >= windowTime) {
+
+				if (handleTimeout) {
+					// extract the timed out event patterns
+					Collection<Map<String, T>> timeoutPatterns = extractPatternMatches(computationState);
+
+					for (Map<String, T> timeoutPattern : timeoutPatterns) {
+						timeoutResult.add(Tuple2.of(timeoutPattern, timestamp));
+					}
+				}
+
 				// remove computation state which has exceeded the window length
 				sharedBuffer.release(computationState.getState(), computationState.getEvent(), computationState.getTimestamp());
 				sharedBuffer.remove(computationState.getState(), computationState.getEvent(), computationState.getTimestamp());
@@ -150,24 +170,24 @@ public class NFA<T> implements Serializable {
 					computationStates.add(newComputationState);
 				}
 			}
+		}
 
-			// prune shared buffer based on window length
-			if(windowTime > 0) {
-				long pruningTimestamp = timestamp - windowTime;
-
-				// sanity check to guard against underflows
-				if (pruningTimestamp >= timestamp) {
-					throw new IllegalStateException("Detected an underflow in the pruning timestamp. This indicates that" +
-						" either the window length is too long (" + windowTime + ") or that the timestamp has not been" +
-						" set correctly (e.g. Long.MIN_VALUE).");
-				}
+		// prune shared buffer based on window length
+		if(windowTime > 0) {
+			long pruningTimestamp = timestamp - windowTime;
 
-				// remove all elements which are expired with respect to the window length
-				sharedBuffer.prune(pruningTimestamp);
+			// sanity check to guard against underflows
+			if (pruningTimestamp >= timestamp) {
+				throw new IllegalStateException("Detected an underflow in the pruning timestamp. This indicates that" +
+					" either the window length is too long (" + windowTime + ") or that the timestamp has not been" +
+					" set correctly (e.g. Long.MIN_VALUE).");
 			}
+
+			// remove all elements which are expired with respect to the window length
+			sharedBuffer.prune(pruningTimestamp);
 		}
 
-		return result;
+		return Tuple2.of(result, timeoutResult);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/57ef6c31/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
index f2561d4..878e0b2 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
@@ -48,11 +48,15 @@ public class NFACompiler {
 	 *
 	 * @param pattern Definition of sequence pattern
 	 * @param inputTypeSerializer Serializer for the input type
+	 * @param timeoutHandling True if the NFA shall return timed out event patterns
 	 * @param <T> Type of the input events
 	 * @return Non-deterministic finite automaton representing the given pattern
 	 */
-	public static <T> NFA<T> compile(Pattern<T, ?> pattern, TypeSerializer<T> inputTypeSerializer) {
-		NFAFactory<T> factory = compileFactory(pattern, inputTypeSerializer);
+	public static <T> NFA<T> compile(
+		Pattern<T, ?> pattern,
+		TypeSerializer<T> inputTypeSerializer,
+		boolean timeoutHandling) {
+		NFAFactory<T> factory = compileFactory(pattern, inputTypeSerializer, timeoutHandling);
 
 		return factory.createNFA();
 	}
@@ -63,14 +67,18 @@ public class NFACompiler {
 	 *
 	 * @param pattern Definition of sequence pattern
 	 * @param inputTypeSerializer Serializer for the input type
+	 * @param timeoutHandling True if the NFA shall return timed out event patterns
 	 * @param <T> Type of the input events
 	 * @return Factory for NFAs corresponding to the given pattern
 	 */
 	@SuppressWarnings("unchecked")
-	public static <T> NFAFactory<T> compileFactory(Pattern<T, ?> pattern, TypeSerializer<T> inputTypeSerializer) {
+	public static <T> NFAFactory<T> compileFactory(
+		Pattern<T, ?> pattern,
+		TypeSerializer<T> inputTypeSerializer,
+		boolean timeoutHandling) {
 		if (pattern == null) {
 			// return a factory for empty NFAs
-			return new NFAFactoryImpl<T>(inputTypeSerializer, 0, Collections.<State<T>>emptyList());
+			return new NFAFactoryImpl<T>(inputTypeSerializer, 0, Collections.<State<T>>emptyList(), timeoutHandling);
 		} else {
 			// set of all generated states
 			Map<String, State<T>> states = new HashMap<>();
@@ -137,10 +145,7 @@ public class NFACompiler {
 				(FilterFunction<T>) currentPattern.getFilterFunction()
 			));
 
-			NFA<T> nfa = new NFA<T>(inputTypeSerializer, windowTime);
-			nfa.addStates(states.values());
-
-			return new NFAFactoryImpl<T>(inputTypeSerializer, windowTime, new HashSet<>(states.values()));
+			return new NFAFactoryImpl<T>(inputTypeSerializer, windowTime, new HashSet<>(states.values()), timeoutHandling);
 		}
 	}
 
@@ -168,16 +173,23 @@ public class NFACompiler {
 		private final TypeSerializer<T> inputTypeSerializer;
 		private final long windowTime;
 		private final Collection<State<T>> states;
+		private final boolean timeoutHandling;
+
+		private NFAFactoryImpl(
+			TypeSerializer<T> inputTypeSerializer,
+			long windowTime,
+			Collection<State<T>> states,
+			boolean timeoutHandling) {
 
-		private NFAFactoryImpl(TypeSerializer<T> inputTypeSerializer, long windowTime, Collection<State<T>> states) {
 			this.inputTypeSerializer = inputTypeSerializer;
 			this.windowTime = windowTime;
 			this.states = states;
+			this.timeoutHandling = timeoutHandling;
 		}
 
 		@Override
 		public NFA<T> createNFA() {
-			NFA<T> result =  new NFA<>(inputTypeSerializer.duplicate(), windowTime);
+			NFA<T> result =  new NFA<>(inputTypeSerializer.duplicate(), windowTime, timeoutHandling);
 
 			result.addStates(states);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/57ef6c31/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPBasePatternOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPBasePatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPBasePatternOperator.java
new file mode 100644
index 0000000..44649ac
--- /dev/null
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPBasePatternOperator.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.operator;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.cep.nfa.NFA;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.io.IOException;
+import java.util.PriorityQueue;
+
+/**
+ * Base class for CEP pattern operator. The operator uses a {@link NFA} to detect complex event
+ * patterns. The detected event patterns are then outputted to the down stream operators.
+ *
+ * @param <IN> Type of the input elements
+ * @param <OUT> Type fo the output elements
+ */
+public abstract class AbstractCEPBasePatternOperator<IN, OUT>
+	extends AbstractStreamOperator<OUT>
+	implements OneInputStreamOperator<IN, OUT> {
+
+	private static final long serialVersionUID = -4166778210774160757L;
+
+	protected static final int INITIAL_PRIORITY_QUEUE_CAPACITY = 11;
+
+	private final TypeSerializer<IN> inputSerializer;
+	private final boolean isProcessingTime;
+
+	public AbstractCEPBasePatternOperator(
+			final TypeSerializer<IN> inputSerializer,
+			final boolean isProcessingTime) {
+		this.inputSerializer = inputSerializer;
+		this.isProcessingTime = isProcessingTime;
+	}
+
+	public TypeSerializer<IN> getInputSerializer() {
+		return inputSerializer;
+	}
+
+	protected abstract NFA<IN> getNFA() throws IOException;
+
+	protected abstract PriorityQueue<StreamRecord<IN>> getPriorityQueue() throws IOException;
+
+	@Override
+	public void processElement(StreamRecord<IN> element) throws Exception {
+		if (isProcessingTime) {
+			// there can be no out of order elements in processing time
+			NFA<IN> nfa = getNFA();
+			processEvent(nfa, element.getValue(), System.currentTimeMillis());
+		} else {
+			PriorityQueue<StreamRecord<IN>> priorityQueue = getPriorityQueue();
+
+			// event time processing
+			// we have to buffer the elements until we receive the proper watermark
+			if (getExecutionConfig().isObjectReuseEnabled()) {
+				// copy the StreamRecord so that it cannot be changed
+				priorityQueue.offer(new StreamRecord<IN>(inputSerializer.copy(element.getValue()), element.getTimestamp()));
+			} else {
+				priorityQueue.offer(element);
+			}
+		}
+	}
+
+	/**
+	 * Process the given event by giving it to the NFA and outputting the produced set of matched
+	 * event sequences.
+	 *
+	 * @param nfa NFA to be used for the event detection
+	 * @param event The current event to be processed
+	 * @param timestamp The timestamp of the event
+	 */
+	protected abstract void processEvent(NFA<IN> nfa, IN event, long timestamp);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/57ef6c31/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java
index 6b087e3..753656f 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java
@@ -20,89 +20,123 @@ package org.apache.flink.cep.operator;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.cep.nfa.NFA;
-import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.cep.nfa.compiler.NFACompiler;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
+import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
 
 import java.io.IOException;
-import java.util.Collection;
-import java.util.Map;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
 import java.util.PriorityQueue;
 
 /**
- * Base class for CEP pattern operator. The operator uses a {@link NFA} to detect complex event
- * patterns. The detected event patterns are then outputted to the down stream operators.
+ * Abstract CEP pattern operator which is used for non keyed streams. Consequently,
+ * the operator state only includes a single {@link NFA} and a priority queue to order out of order
+ * elements in case of event time processing.
  *
  * @param <IN> Type of the input elements
+ * @param <OUT> Type of the output elements
  */
-public abstract class AbstractCEPPatternOperator<IN>
-	extends AbstractStreamOperator<Map<String, IN>>
-	implements OneInputStreamOperator<IN, Map<String, IN>> {
+abstract public class AbstractCEPPatternOperator<IN, OUT> extends AbstractCEPBasePatternOperator<IN, OUT> {
+	private static final long serialVersionUID = 7487334510746595640L;
 
-	private static final long serialVersionUID = -4166778210774160757L;
+	private final StreamRecordSerializer<IN> streamRecordSerializer;
 
-	protected static final int INITIAL_PRIORITY_QUEUE_CAPACITY = 11;
+	// global nfa for all elements
+	private NFA<IN> nfa;
 
-	private final TypeSerializer<IN> inputSerializer;
-	private final boolean isProcessingTime;
+	// queue to buffer out of order stream records
+	private transient PriorityQueue<StreamRecord<IN>> priorityQueue;
 
 	public AbstractCEPPatternOperator(
-			final TypeSerializer<IN> inputSerializer,
-			final boolean isProcessingTime) {
-		this.inputSerializer = inputSerializer;
-		this.isProcessingTime = isProcessingTime;
+			TypeSerializer<IN> inputSerializer,
+			boolean isProcessingTime,
+			NFACompiler.NFAFactory<IN> nfaFactory) {
+		super(inputSerializer, isProcessingTime);
+
+		this.streamRecordSerializer = new StreamRecordSerializer<>(inputSerializer);
+		this.nfa = nfaFactory.createNFA();
 	}
 
-	public TypeSerializer<IN> getInputSerializer() {
-		return inputSerializer;
+	@Override
+	public void open() {
+		if (priorityQueue == null) {
+			priorityQueue = new PriorityQueue<StreamRecord<IN>>(INITIAL_PRIORITY_QUEUE_CAPACITY, new StreamRecordComparator<IN>());
+		}
 	}
 
-	protected abstract NFA<IN> getNFA() throws IOException;
+	@Override
+	protected NFA<IN> getNFA() throws IOException {
+		return nfa;
+	}
 
-	protected abstract PriorityQueue<StreamRecord<IN>> getPriorityQueue() throws IOException;
+	@Override
+	protected PriorityQueue<StreamRecord<IN>> getPriorityQueue() throws IOException {
+		return priorityQueue;
+	}
 
 	@Override
-	public void processElement(StreamRecord<IN> element) throws Exception {
-		if (isProcessingTime) {
-			// there can be no out of order elements in processing time
-			NFA<IN> nfa = getNFA();
-			processEvent(nfa, element.getValue(), System.currentTimeMillis());
-		} else {
-			PriorityQueue<StreamRecord<IN>> priorityQueue = getPriorityQueue();
-
-			// event time processing
-			// we have to buffer the elements until we receive the proper watermark
-			if (getExecutionConfig().isObjectReuseEnabled()) {
-				// copy the StreamRecord so that it cannot be changed
-				priorityQueue.offer(new StreamRecord<IN>(inputSerializer.copy(element.getValue()), element.getTimestamp()));
-			} else {
-				priorityQueue.offer(element);
-			}
+	public void processWatermark(Watermark mark) throws Exception {
+		while(!priorityQueue.isEmpty() && priorityQueue.peek().getTimestamp() <= mark.getTimestamp()) {
+			StreamRecord<IN> streamRecord = priorityQueue.poll();
+
+			processEvent(nfa, streamRecord.getValue(), streamRecord.getTimestamp());
 		}
+
+		output.emitWatermark(mark);
 	}
 
-	/**
-	 * Process the given event by giving it to the NFA and outputting the produced set of matched
-	 * event sequences.
-	 *
-	 * @param nfa NFA to be used for the event detection
-	 * @param event The current event to be processed
-	 * @param timestamp The timestamp of the event
-	 */
-	protected void processEvent(NFA<IN> nfa, IN event, long timestamp) {
-		Collection<Map<String, IN>> patterns = nfa.process(
-			event,
+	@Override
+	public StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception {
+		StreamTaskState taskState = super.snapshotOperatorState(checkpointId, timestamp);
+
+		final AbstractStateBackend.CheckpointStateOutputStream os = this.getStateBackend().createCheckpointStateOutputStream(
+			checkpointId,
 			timestamp);
 
-		if (!patterns.isEmpty()) {
-			StreamRecord<Map<String, IN>> streamRecord = new StreamRecord<Map<String, IN>>(
-				null,
-				timestamp);
+		final ObjectOutputStream oos = new ObjectOutputStream(os);
+		final AbstractStateBackend.CheckpointStateOutputView ov = new AbstractStateBackend.CheckpointStateOutputView(os);
+
+		oos.writeObject(nfa);
+
+		ov.writeInt(priorityQueue.size());
 
-			for (Map<String, IN> pattern : patterns) {
-				streamRecord.replace(pattern);
-				output.collect(streamRecord);
-			}
+		for (StreamRecord<IN> streamRecord: priorityQueue) {
+			streamRecordSerializer.serialize(streamRecord, ov);
 		}
+
+		taskState.setOperatorState(os.closeAndGetHandle());
+
+		return taskState;
+	}
+
+	@Override
+	@SuppressWarnings("unchecked")
+	public void restoreState(StreamTaskState state, long recoveryTimestamp) throws Exception {
+		super.restoreState(state, recoveryTimestamp);
+
+		StreamStateHandle stream = (StreamStateHandle)state.getOperatorState();
+
+		final InputStream is = stream.getState(getUserCodeClassloader());
+		final ObjectInputStream ois = new ObjectInputStream(is);
+		final DataInputViewStreamWrapper div = new DataInputViewStreamWrapper(is);
+
+		nfa = (NFA<IN>)ois.readObject();
+
+		int numberPriorityQueueEntries = div.readInt();
+
+		priorityQueue = new PriorityQueue<StreamRecord<IN>>(numberPriorityQueueEntries, new StreamRecordComparator<IN>());
+
+		for (int i = 0; i <numberPriorityQueueEntries; i++) {
+			priorityQueue.offer(streamRecordSerializer.deserialize(div));
+		}
+
+		div.close();
 	}
 }