You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/10/02 16:50:59 UTC

[1/3] flink git commit: [FLINK-2727] [streaming] Add a base class for Message Queue Sources that acknowledge messages by ID.

Repository: flink
Updated Branches:
  refs/heads/master d486cde76 -> 6e0e67d2e


[FLINK-2727] [streaming] Add a base class for Message Queue Sources that acknowledge messages by ID.

This closes #1163


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/daeab849
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/daeab849
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/daeab849

Branch: refs/heads/master
Commit: daeab84985bb510351b04e08f83cdd6ce8075225
Parents: d486cde
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Sep 22 13:23:56 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Oct 2 15:07:37 2015 +0200

----------------------------------------------------------------------
 .../runtime/util/DataOutputSerializer.java      |   5 +
 .../streaming/api/checkpoint/Checkpointed.java  |   2 +-
 .../source/MessageAcknowledingSourceBase.java   | 173 ++++++++++++++++++
 .../api/state/SerializedCheckpointData.java     | 176 +++++++++++++++++++
 4 files changed, 355 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/daeab849/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataOutputSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataOutputSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataOutputSerializer.java
index 7f8105d..0e93544 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataOutputSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataOutputSerializer.java
@@ -29,6 +29,7 @@ import java.io.IOException;
 import java.io.UTFDataFormatException;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
+import java.util.Arrays;
 
 /**
  * A simple and efficient serializer for the {@link java.io.DataOutput} interface.
@@ -66,6 +67,10 @@ public class DataOutputSerializer implements DataOutputView {
 	public byte[] getByteArray() {
 		return buffer;
 	}
+	
+	public byte[] getCopyOfBuffer() {
+		return Arrays.copyOf(buffer, position);
+	}
 
 	public void clear() {
 		this.position = 0;

http://git-wip-us.apache.org/repos/asf/flink/blob/daeab849/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/Checkpointed.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/Checkpointed.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/Checkpointed.java
index cb49dba..ac1cbfb 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/Checkpointed.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/Checkpointed.java
@@ -61,5 +61,5 @@ public interface Checkpointed<T extends Serializable> {
 	 *
 	 * @param state The state to be restored. 
 	 */
-	void restoreState(T state);
+	void restoreState(T state) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/daeab849/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledingSourceBase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledingSourceBase.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledingSourceBase.java
new file mode 100644
index 0000000..3817ede
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/MessageAcknowledingSourceBase.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.state.SerializedCheckpointData;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * Abstract base class for data sources that receive elements from a message queue and
+ * acknowledge them back by IDs.
+ * <p>
+ * The mechanism for this source assumes that messages are identified by a unique ID.
+ * When messages are taken from the message queue, the message must not be dropped immediately,
+ * but must be retained until acknowledged. Messages that are not acknowledged within a certain
+ * time interval will be served again (to a different connection, established by the recovered source).
+ * <p>
+ * Note that this source can give no guarantees about message order in teh case of failures,
+ * because messages that were retrieved but not yet acknowledged will be returned later again, after
+ * a set of messages that was not retrieved before the failure.
+ * <p>
+ * Internally, this source gathers the IDs of elements it emits. Per checkpoint, the IDs are stored and
+ * acknowledged when the checkpoint is complete. That way, no message is acknowledged unless it is certain
+ * that it has been successfully processed throughout the topology and the updates to any state caused by
+ * that message are persistent.
+ * <p>
+ * All messages that are emitted and successfully processed by the streaming program will eventually be
+ * acknowledged. In corner cases, the source may acknowledge certain IDs multiple times, if a
+ * failure occurs while acknowledging.
+ * <p>
+ * A typical way to use this base in a source function is by implementing a run() method as follows:
+ * <pre>{@code
+ * public void run(SourceContext<Type> ctx) throws Exception {
+ *     while (running) {
+ *         Message msg = queue.retrieve();
+ *         synchronized (ctx.getCheckpointLock()) {
+ *             ctx.collect(msg.getMessageData());
+ *             addId(msg.getMessageId());
+ *         }
+ *     }
+ * }
+ * }</pre>
+ * 
+ * @param <Type> The type of the messages created by the source.
+ * @param <Id> The type of the IDs that are used for acknowledging elements.
+ */
+public abstract class MessageAcknowledingSourceBase<Type, Id> extends RichSourceFunction<Type> 
+	implements Checkpointed<SerializedCheckpointData[]>, CheckpointNotifier {
+	
+	private static final long serialVersionUID = -8689291992192955579L;
+	
+	/** Serializer used to serialize the IDs for checkpoints */
+	private final TypeSerializer<Id> idSerializer;
+	
+	/** The list gathering the IDs of messages emitted during the current checkpoint */
+	private transient List<Id> idsForCurrentCheckpoint;
+
+	/** The list with IDs from checkpoints that were triggered, but not yet completed or notified of completion */
+	private transient ArrayDeque<Tuple2<Long, List<Id>>> pendingCheckpoints;
+
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Creates a new MessageAcknowledingSourceBase for IDs of teh given type.
+	 * 
+	 * @param idClass The class of the message ID type, used to create a serializer for the message IDs.
+	 */
+	protected MessageAcknowledingSourceBase(Class<Id> idClass) {
+		this(TypeExtractor.getForClass(idClass));
+	}
+
+	/**
+	 * Creates a new MessageAcknowledingSourceBase for IDs of teh given type.
+	 * 
+	 * @param idTypeInfo The type information of the message ID type, used to create a serializer for the message IDs.
+	 */
+	protected MessageAcknowledingSourceBase(TypeInformation<Id> idTypeInfo) {
+		this.idSerializer = idTypeInfo.createSerializer(new ExecutionConfig());
+	}
+
+	@Override
+	public void open(Configuration parameters) throws Exception {
+		idsForCurrentCheckpoint = new ArrayList<>(64);
+		pendingCheckpoints = new ArrayDeque<>();
+	}
+
+	@Override
+	public void close() throws Exception {
+		idsForCurrentCheckpoint.clear();
+		pendingCheckpoints.clear();
+	}
+
+
+	// ------------------------------------------------------------------------
+	//  ID Checkpointing
+	// ------------------------------------------------------------------------
+
+	/**
+	 * This method must be implemented to acknowledge the given set of IDs back to the message queue.
+	 * @param ids The list od IDs to acknowledge.
+	 */
+	protected abstract void acknowledgeIDs(List<Id> ids);
+
+	/**
+	 * Adds an ID to be stored with the current checkpoint.
+	 * @param id The ID to add.
+	 */
+	protected void addId(Id id) {
+		idsForCurrentCheckpoint.add(id);
+	}
+
+	// ------------------------------------------------------------------------
+	//  Checkpointing the data
+	// ------------------------------------------------------------------------
+	
+	@Override
+	public SerializedCheckpointData[] snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
+		pendingCheckpoints.addLast(new Tuple2<Long, List<Id>>(checkpointId, idsForCurrentCheckpoint));
+		idsForCurrentCheckpoint = new ArrayList<>(64);
+		
+		return SerializedCheckpointData.fromDeque(pendingCheckpoints, idSerializer);
+	}
+
+	@Override
+	public void restoreState(SerializedCheckpointData[] state) throws Exception {
+		pendingCheckpoints = SerializedCheckpointData.toDeque(state, idSerializer);
+	}
+
+	@Override
+	public void notifyCheckpointComplete(long checkpointId) throws Exception {
+		for (Iterator<Tuple2<Long, List<Id>>> iter = pendingCheckpoints.iterator(); iter.hasNext();) {
+			Tuple2<Long, List<Id>> checkpoint = iter.next();
+			long id = checkpoint.f0;
+			
+			if (id <= checkpointId) {
+				acknowledgeIDs(checkpoint.f1);
+				iter.remove();
+			}
+			else {
+				break;
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/daeab849/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/SerializedCheckpointData.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/SerializedCheckpointData.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/SerializedCheckpointData.java
new file mode 100644
index 0000000..2bbb4e2
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/SerializedCheckpointData.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.state;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.util.DataInputDeserializer;
+import org.apache.flink.runtime.util.DataOutputSerializer;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * This class represents serialized checkpoint data for a collection of elements.
+ */
+public class SerializedCheckpointData implements java.io.Serializable {
+
+	private static final long serialVersionUID = -8783744683896503488L;
+	
+	/** ID of the checkpoint for which the IDs are stored */
+	private final long checkpointId;
+
+	/** The serialized elements */
+	private final byte[] serializedData;
+
+	/** The number of elements in the checkpoint */
+	private final int numIds;
+
+	/**
+	 * Creates a SerializedCheckpointData object for the given serialized data.
+	 * 
+	 * @param checkpointId The checkpointId of the checkpoint.
+	 * @param serializedData The serialized IDs in this checkpoint.
+	 * @param numIds The number of IDs in the checkpoint.
+	 */
+	public SerializedCheckpointData(long checkpointId, byte[] serializedData, int numIds) {
+		this.checkpointId = checkpointId;
+		this.serializedData = serializedData;
+		this.numIds = numIds;
+	}
+
+	/**
+	 * Gets the checkpointId of the checkpoint.
+	 * @return The checkpointId of the checkpoint.
+	 */
+	public long getCheckpointId() {
+		return checkpointId;
+	}
+
+	/**
+	 * Gets the binary data for the serialized elements.
+	 * @return The binary data for the serialized elements.
+	 */
+	public byte[] getSerializedData() {
+		return serializedData;
+	}
+
+	/**
+	 * Gets the number of IDs in the checkpoint.
+	 * @return The number of IDs in the checkpoint.
+	 */
+	public int getNumIds() {
+		return numIds;
+	}
+
+	// ------------------------------------------------------------------------
+	//  Serialize to Checkpoint
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Converts a list of checkpoints with elements into an array of SerializedCheckpointData.
+	 * 
+	 * @param checkpoints The checkpoints to be converted into IdsCheckpointData.
+	 * @param serializer The serializer to serialize the IDs.
+	 * @param <T> The type of the ID.
+	 * @return An array of serializable SerializedCheckpointData, one per entry in the 
+	 * 
+	 * @throws IOException Thrown, if the serialization fails.
+	 */
+	public static <T> SerializedCheckpointData[] fromDeque(ArrayDeque<Tuple2<Long, List<T>>> checkpoints,
+												TypeSerializer<T> serializer) throws IOException {
+		return fromDeque(checkpoints, serializer, new DataOutputSerializer(128));
+	}
+
+	/**
+	 * Converts a list of checkpoints into an array of SerializedCheckpointData.
+	 *
+	 * @param checkpoints The checkpoints to be converted into IdsCheckpointData.
+	 * @param serializer The serializer to serialize the IDs.
+	 * @param outputBuffer The reusable serialization buffer.
+	 * @param <T> The type of the ID.
+	 * @return An array of serializable SerializedCheckpointData, one per entry in the 
+	 *
+	 * @throws IOException Thrown, if the serialization fails.
+	 */
+	public static <T> SerializedCheckpointData[] fromDeque(ArrayDeque<Tuple2<Long, List<T>>> checkpoints,
+												TypeSerializer<T> serializer,
+												DataOutputSerializer outputBuffer) throws IOException {
+		SerializedCheckpointData[] serializedCheckpoints = new SerializedCheckpointData[checkpoints.size()];
+		
+		int pos = 0;
+		for (Tuple2<Long, List<T>> checkpoint : checkpoints) {
+			outputBuffer.clear();
+			List<T> checkpointIds = checkpoint.f1;
+			
+			for (T id : checkpointIds) {
+				serializer.serialize(id, outputBuffer);
+			}
+
+			serializedCheckpoints[pos++] = new SerializedCheckpointData(
+					checkpoint.f0, outputBuffer.getCopyOfBuffer(), checkpointIds.size());
+		}
+		
+		return serializedCheckpoints;
+	}
+
+	// ------------------------------------------------------------------------
+	//  De-Serialize from Checkpoint
+	// ------------------------------------------------------------------------
+
+	/**
+	 * De-serializes an array of SerializedCheckpointData back into an ArrayDeque of element checkpoints.
+	 * 
+	 * @param data The data to be deserialized.
+	 * @param serializer The serializer used to deserialize the data.
+	 * @param <T> The type of the elements.
+	 * @return An ArrayDeque of element checkpoints.
+	 * 
+	 * @throws IOException Thrown, if the serialization fails.
+	 */
+	public static <T> ArrayDeque<Tuple2<Long, List<T>>> toDeque(
+			SerializedCheckpointData[] data, TypeSerializer<T> serializer) throws IOException
+	{
+		ArrayDeque<Tuple2<Long, List<T>>> deque = new ArrayDeque<>(data.length);
+		DataInputDeserializer deser = null;
+		
+		for (SerializedCheckpointData checkpoint : data) {
+			byte[] serializedData = checkpoint.getSerializedData();
+			if (deser == null) {
+				deser = new DataInputDeserializer(serializedData, 0, serializedData.length);
+			}
+			else {
+				deser.setBuffer(serializedData, 0, serializedData.length);
+			}
+			
+			final List<T> ids = new ArrayList<>(checkpoint.getNumIds());
+			final int numIds = checkpoint.getNumIds();
+			
+			for (int i = 0; i < numIds; i++) {
+				ids.add(serializer.deserialize(deser));
+			}
+
+			deque.addLast(new Tuple2<Long, List<T>>(checkpoint.checkpointId, ids));
+		}
+		
+		return deque;
+	}
+}


[2/3] flink git commit: [hotfix] [streaming] Apply closure cleaner to KeyedWindowFunction

Posted by se...@apache.org.
[hotfix] [streaming] Apply closure cleaner to KeyedWindowFunction


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/73902011
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/73902011
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/73902011

Branch: refs/heads/master
Commit: 739020110251aa98773695163bb23b52c48f8987
Parents: daeab84
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Oct 1 19:59:57 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Oct 2 15:10:53 2015 +0200

----------------------------------------------------------------------
 .../flink/streaming/api/datastream/KeyedWindowDataStream.java     | 3 +++
 1 file changed, 3 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/73902011/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java
index ad7ca37..302a645 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java
@@ -178,6 +178,9 @@ public class KeyedWindowDataStream<T, K, W extends Window> {
 	 * @return The data stream that is the result of applying the window function to the window.
 	 */
 	public <R> DataStream<R> mapWindow(KeyedWindowFunction<T, R, K, W> function) {
+		// clean the closure
+		function = input.getExecutionEnvironment().clean(function);
+		
 		TypeInformation<T> inType = input.getType();
 		TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType(
 				function, KeyedWindowFunction.class, true, true, inType, null, false);


[3/3] flink git commit: [hotfix] [streaming] Handle rich functions properly in aligned time windows

Posted by se...@apache.org.
[hotfix] [streaming] Handle rich functions properly in aligned time windows


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6e0e67d2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6e0e67d2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6e0e67d2

Branch: refs/heads/master
Commit: 6e0e67d2e0d5180d6fba492e8ab9cc8fb18fdf68
Parents: 7390201
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Oct 1 20:44:14 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Oct 2 15:20:40 2015 +0200

----------------------------------------------------------------------
 .../api/datastream/KeyedWindowDataStream.java   |  5 ++-
 .../api/operators/AbstractStreamOperator.java   |  5 ++-
 .../operators/AbstractUdfStreamOperator.java    | 33 +++++++++++----
 ...ractAlignedProcessingTimeWindowOperator.java | 22 +++++++---
 ...ccumulatingProcessingTimeWindowOperator.java |  5 ++-
 ...AggregatingProcessingTimeWindowOperator.java |  2 +-
 .../streaming/runtime/tasks/StreamTask.java     |  6 +--
 .../runtime/operators/StreamTaskTimerTest.java  | 19 +++++++--
 ...AlignedProcessingTimeWindowOperatorTest.java | 44 ++++++++++----------
 ...AlignedProcessingTimeWindowOperatorTest.java | 17 ++++----
 10 files changed, 101 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6e0e67d2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java
index 302a645..9d05b8c 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java
@@ -34,6 +34,7 @@ import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
 import org.apache.flink.streaming.api.windowing.evictors.Evictor;
 import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
 import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.streaming.runtime.operators.windowing.AccumulatingProcessingTimeWindowOperator;
 import org.apache.flink.streaming.runtime.operators.windowing.AggregatingProcessingTimeWindowOperator;
@@ -248,7 +249,7 @@ public class KeyedWindowDataStream<T, K, W extends Window> {
 			}
 			else if (function instanceof KeyedWindowFunction) {
 				@SuppressWarnings("unchecked")
-				KeyedWindowFunction<T, R, K, Window> wf = (KeyedWindowFunction<T, R, K, Window>) function;
+				KeyedWindowFunction<T, R, K, TimeWindow> wf = (KeyedWindowFunction<T, R, K, TimeWindow>) function;
 
 				OneInputStreamOperator<T, R> op = new AccumulatingProcessingTimeWindowOperator<>(
 						wf, input.getKeySelector(), windowLength, windowSlide);
@@ -273,7 +274,7 @@ public class KeyedWindowDataStream<T, K, W extends Window> {
 			}
 			else if (function instanceof KeyedWindowFunction) {
 				@SuppressWarnings("unchecked")
-				KeyedWindowFunction<T, R, K, Window> wf = (KeyedWindowFunction<T, R, K, Window>) function;
+				KeyedWindowFunction<T, R, K, TimeWindow> wf = (KeyedWindowFunction<T, R, K, TimeWindow>) function;
 
 				OneInputStreamOperator<T, R> op = new AccumulatingProcessingTimeWindowOperator<>(
 						wf, input.getKeySelector(), windowLength, windowSlide);

http://git-wip-us.apache.org/repos/asf/flink/blob/6e0e67d2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 07d8312..77bd130 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -24,7 +24,10 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
 
 /**
- * Base class for operators that do not contain a user-defined function.
+ * Base class for all stream operators.
+ * 
+ * Operators that contain a user function should extend the class 
+ * {@link AbstractUdfStreamOperator} instead (which is a specialized subclass of this class).
  * 
  * @param <OUT> The output type of the operator
  */

http://git-wip-us.apache.org/repos/asf/flink/blob/6e0e67d2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
index f679d5f..c0d71e9 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
@@ -22,6 +22,7 @@ import java.io.Serializable;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Objects;
 
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.functions.util.FunctionUtils;
@@ -41,29 +42,45 @@ import org.slf4j.LoggerFactory;
 
 /**
  * This is used as the base class for operators that have a user-defined
- * function.
+ * function. This class handles the opening and closing of the user-defined functions,
+ * as part of the operator life cycle.
  * 
  * @param <OUT>
  *            The output type of the operator
  * @param <F>
  *            The type of the user function
  */
-public abstract class AbstractUdfStreamOperator<OUT, F extends Function & Serializable> 
+public abstract class AbstractUdfStreamOperator<OUT, F extends Function> 
 		extends AbstractStreamOperator<OUT> implements StatefulStreamOperator<OUT> {
 
 	private static final long serialVersionUID = 1L;
 	
 	private static final Logger LOG = LoggerFactory.getLogger(AbstractUdfStreamOperator.class);
 	
-
+	
+	/** the user function */
 	protected final F userFunction;
 	
+	/** Flag to prevent duplicate function.close() calls in close() and dispose() */
 	private boolean functionsClosed = false;
 
+	
 	public AbstractUdfStreamOperator(F userFunction) {
-		this.userFunction = userFunction;
+		this.userFunction = Objects.requireNonNull(userFunction);
 	}
 
+	/**
+	 * Gets the user function executed in this operator.
+	 * @return The user function of this operator.
+	 */
+	public F getUserFunction() {
+		return userFunction;
+	}
+	
+	// ------------------------------------------------------------------------
+	//  operator life cycle
+	// ------------------------------------------------------------------------
+	
 	@Override
 	public final void setup(Output<StreamRecord<OUT>> output, StreamingRuntimeContext runtimeContext) {
 		super.setup(output, runtimeContext);
@@ -97,6 +114,10 @@ public abstract class AbstractUdfStreamOperator<OUT, F extends Function & Serial
 		}
 	}
 
+	// ------------------------------------------------------------------------
+	//  checkpointing and recovery
+	// ------------------------------------------------------------------------
+	
 	@Override
 	@SuppressWarnings({ "unchecked", "rawtypes" })
 	public void restoreInitialState(Tuple2<StateHandle<Serializable>, Map<String, OperatorStateHandle>> snapshots) throws Exception {
@@ -170,8 +191,4 @@ public abstract class AbstractUdfStreamOperator<OUT, F extends Function & Serial
 			}
 		}
 	}
-
-	public F getUserFunction() {
-		return userFunction;
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6e0e67d2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
index a81340f..4fcfb2c 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
@@ -24,7 +24,7 @@ import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.util.MathUtils;
-import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.TimestampedCollector;
 import org.apache.flink.streaming.api.watermark.Watermark;
@@ -33,7 +33,8 @@ import org.apache.flink.streaming.runtime.operators.Triggerable;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
 
-public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT> extends AbstractStreamOperator<OUT> 
+public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT, F extends Function> 
+		extends AbstractUdfStreamOperator<OUT, F> 
 		implements OneInputStreamOperator<IN, OUT>, Triggerable {
 	
 	private static final long serialVersionUID = 3245500864882459867L;
@@ -60,11 +61,13 @@ public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT>
 	private transient long nextSlideTime;
 	
 	protected AbstractAlignedProcessingTimeWindowOperator(
-			Function function,
+			F function,
 			KeySelector<IN, KEY> keySelector,
 			long windowLength,
 			long windowSlide)
 	{
+		super(function);
+		
 		if (function == null || keySelector == null) {
 			throw new NullPointerException();
 		}
@@ -103,6 +106,8 @@ public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT>
 
 	@Override
 	public void open(Configuration parameters) throws Exception {
+		super.open(parameters);
+		
 		out = new TimestampedCollector<>(output);
 		
 		// create the panes that gather the elements per slide
@@ -119,6 +124,8 @@ public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT>
 
 	@Override
 	public void close() throws Exception {
+		super.close();
+		
 		final long finalWindowTimestamp = nextEvaluationTime;
 
 		// early stop the triggering thread, so it does not attempt to return any more data
@@ -130,12 +137,17 @@ public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT>
 
 	@Override
 	public void dispose() {
+		super.dispose();
+		
 		// acquire the lock during shutdown, to prevent trigger calls at the same time
 		// fail-safe stop of the triggering thread (in case of an error)
 		stopTriggers();
 
-		// release the panes
-		panes.dispose();
+		// release the panes. panes may still be null if dispose is called
+		// after open() failed
+		if (panes != null) {
+			panes.dispose();
+		}
 	}
 	
 	private void stopTriggers() {

http://git-wip-us.apache.org/repos/asf/flink/blob/6e0e67d2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java
index 8edb76f..ace3823 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java
@@ -21,17 +21,18 @@ package org.apache.flink.streaming.runtime.operators.windowing;
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.streaming.api.functions.windowing.KeyedWindowFunction;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 
 
 public class AccumulatingProcessingTimeWindowOperator<KEY, IN, OUT> 
-		extends AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT>  {
+		extends AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT, KeyedWindowFunction<IN, OUT, KEY, TimeWindow>>  {
 
 	private static final long serialVersionUID = 7305948082830843475L;
 
 	
 	public AccumulatingProcessingTimeWindowOperator(
-			KeyedWindowFunction<IN, OUT, KEY, Window> function,
+			KeyedWindowFunction<IN, OUT, KEY, TimeWindow> function,
 			KeySelector<IN, KEY> keySelector,
 			long windowLength,
 			long windowSlide)

http://git-wip-us.apache.org/repos/asf/flink/blob/6e0e67d2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingProcessingTimeWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingProcessingTimeWindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingProcessingTimeWindowOperator.java
index 99457bb..cc38019 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingProcessingTimeWindowOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingProcessingTimeWindowOperator.java
@@ -23,7 +23,7 @@ import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.java.functions.KeySelector;
 
 public class AggregatingProcessingTimeWindowOperator<KEY, IN> 
-		extends AbstractAlignedProcessingTimeWindowOperator<KEY, IN, IN>  {
+		extends AbstractAlignedProcessingTimeWindowOperator<KEY, IN, IN, ReduceFunction<IN>> {
 
 	private static final long serialVersionUID = 7305948082830843475L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6e0e67d2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 1b35350..16b8f55 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -86,6 +86,9 @@ import org.slf4j.LoggerFactory;
  */
 public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends AbstractInvokable implements StatefulTask<StateHandle<Serializable>> {
 
+	/** The thread group that holds all trigger timer threads */
+	public static final ThreadGroup TRIGGER_THREAD_GROUP = new ThreadGroup("Triggers");
+	
 	private static final Logger LOG = LoggerFactory.getLogger(StreamTask.class);
 
 	/**
@@ -104,9 +107,6 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends Abs
 
 	protected ClassLoader userClassLoader;
 
-	/** The thread group that holds all trigger timer threads */
-	public static final ThreadGroup TRIGGER_THREAD_GROUP = new ThreadGroup("Triggers");
-
 	/** The executor service that */
 	private ScheduledExecutorService timerService;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6e0e67d2/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
index 2aed041..67df3ad 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.streaming.runtime.operators;
 
+import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.streaming.api.graph.StreamConfig;
@@ -25,8 +26,10 @@ import org.apache.flink.streaming.api.operators.StreamMap;
 import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
 import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
+
 import org.junit.Test;
 import org.junit.runner.RunWith;
+
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
@@ -38,7 +41,8 @@ import static org.junit.Assert.*;
  * Tests for the timer service of {@link org.apache.flink.streaming.runtime.tasks.StreamTask}.
  */
 @RunWith(PowerMockRunner.class)
-@PrepareForTest({ResultPartitionWriter.class})
+@PrepareForTest(ResultPartitionWriter.class)
+@SuppressWarnings("serial")
 public class StreamTaskTimerTest {
 
 	@Test
@@ -47,7 +51,8 @@ public class StreamTaskTimerTest {
 		final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<>(mapTask, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
 
 		StreamConfig streamConfig = testHarness.getStreamConfig();
-		StreamMap<String, String> mapOperator = new StreamMap<>(null);
+		
+		StreamMap<String, String> mapOperator = new StreamMap<>(new DummyMapFunction<String>());
 		streamConfig.setStreamOperator(mapOperator);
 
 		testHarness.invoke();
@@ -77,12 +82,11 @@ public class StreamTaskTimerTest {
 	@Test
 	public void checkScheduledTimestampe() {
 		try {
-
 			final OneInputStreamTask<String, String> mapTask = new OneInputStreamTask<>();
 			final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<>(mapTask, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
 
 			StreamConfig streamConfig = testHarness.getStreamConfig();
-			StreamMap<String, String> mapOperator = new StreamMap<>(null);
+			StreamMap<String, String> mapOperator = new StreamMap<>(new DummyMapFunction<String>());
 			streamConfig.setStreamOperator(mapOperator);
 
 			testHarness.invoke();
@@ -162,4 +166,11 @@ public class StreamTaskTimerTest {
 			}
 		}
 	}
+	
+	// ------------------------------------------------------------------------
+	
+	public static class DummyMapFunction<T> implements MapFunction<T, T> {
+		@Override
+		public T map(T value) { return value; }
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6e0e67d2/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
index 4327e11..99a2e14 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
@@ -22,13 +22,13 @@ import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.windowing.KeyedWindowFunction;
 import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.streaming.runtime.operators.Triggerable;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
 import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
-
 import org.apache.flink.util.Collector;
+
 import org.junit.After;
 import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
@@ -45,11 +45,11 @@ import java.util.concurrent.TimeUnit;
 import static org.mockito.Mockito.*;
 import static org.junit.Assert.*;
 
-@SuppressWarnings("serial")
+@SuppressWarnings({"serial", "SynchronizationOnLocalVariableOrMethodParameter"})
 public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 
 	@SuppressWarnings("unchecked")
-	private final KeyedWindowFunction<String, String, String, Window> mockFunction = mock(KeyedWindowFunction.class);
+	private final KeyedWindowFunction<String, String, String, TimeWindow> mockFunction = mock(KeyedWindowFunction.class);
 
 	@SuppressWarnings("unchecked")
 	private final KeySelector<String, String> mockKeySelector = mock(KeySelector.class);
@@ -61,11 +61,11 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 		}
 	};
 	
-	private final KeyedWindowFunction<Integer, Integer, Integer, Window> validatingIdentityFunction =
-			new KeyedWindowFunction<Integer, Integer, Integer, Window>()
+	private final KeyedWindowFunction<Integer, Integer, Integer, TimeWindow> validatingIdentityFunction =
+			new KeyedWindowFunction<Integer, Integer, Integer, TimeWindow>()
 	{
 		@Override
-		public void evaluate(Integer key, Window window, Iterable<Integer> values, Collector<Integer> out) {
+		public void evaluate(Integer key, TimeWindow window, Iterable<Integer> values, Collector<Integer> out) {
 			for (Integer val : values) {
 				assertEquals(key, val);
 				out.collect(val);
@@ -112,7 +112,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 	@Test
 	public void testWindowSizeAndSlide() {
 		try {
-			AbstractAlignedProcessingTimeWindowOperator<String, String, String> op;
+			AbstractAlignedProcessingTimeWindowOperator<String, String, String, ?> op;
 			
 			op = new AccumulatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, 5000, 1000);
 			assertEquals(5000, op.getWindowSize());
@@ -153,7 +153,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 			final StreamingRuntimeContext mockContext = mock(StreamingRuntimeContext.class);
 			when(mockContext.getTaskName()).thenReturn("Test task name");
 			
-			AbstractAlignedProcessingTimeWindowOperator<String, String, String> op;
+			AbstractAlignedProcessingTimeWindowOperator<String, String, String, ?> op;
 
 			op = new AccumulatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, 5000, 1000);
 			op.setup(mockOut, mockContext);
@@ -199,7 +199,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 			when(mockContext.getTaskName()).thenReturn("Test task name");
 
 			// tumbling window that triggers every 20 milliseconds
-			AbstractAlignedProcessingTimeWindowOperator<Integer, Integer, Integer> op =
+			AbstractAlignedProcessingTimeWindowOperator<Integer, Integer, Integer, ?> op =
 					new AccumulatingProcessingTimeWindowOperator<>(
 							validatingIdentityFunction, identitySelector, windowSize, windowSize);
 
@@ -240,7 +240,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 			when(mockContext.getTaskName()).thenReturn("Test task name");
 
 			// tumbling window that triggers every 20 milliseconds
-			AbstractAlignedProcessingTimeWindowOperator<Integer, Integer, Integer> op =
+			AbstractAlignedProcessingTimeWindowOperator<Integer, Integer, Integer, ?> op =
 					new AccumulatingProcessingTimeWindowOperator<>(validatingIdentityFunction, identitySelector, 150, 50);
 
 			op.setup(out, mockContext);
@@ -299,9 +299,9 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 
 			final Object lock = new Object();
 
-			doAnswer(new Answer() {
+			doAnswer(new Answer<Void>() {
 				@Override
-				public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+				public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
 					final Long timestamp = (Long) invocationOnMock.getArguments()[0];
 					final Triggerable target = (Triggerable) invocationOnMock.getArguments()[1];
 					timerService.schedule(
@@ -321,7 +321,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 			}).when(mockContext).registerTimer(anyLong(), any(Triggerable.class));
 
 			// tumbling window that triggers every 20 milliseconds
-			AbstractAlignedProcessingTimeWindowOperator<Integer, Integer, Integer> op =
+			AbstractAlignedProcessingTimeWindowOperator<Integer, Integer, Integer, ?> op =
 					new AccumulatingProcessingTimeWindowOperator<>(validatingIdentityFunction, identitySelector, 50, 50);
 
 			op.setup(out, mockContext);
@@ -374,9 +374,9 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 
 			final Object lock = new Object();
 
-			doAnswer(new Answer() {
+			doAnswer(new Answer<Void>() {
 				@Override
-				public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+				public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
 					final Long timestamp = (Long) invocationOnMock.getArguments()[0];
 					final Triggerable target = (Triggerable) invocationOnMock.getArguments()[1];
 					timerService.schedule(
@@ -396,7 +396,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 			}).when(mockContext).registerTimer(anyLong(), any(Triggerable.class));
 
 			// tumbling window that triggers every 20 milliseconds
-			AbstractAlignedProcessingTimeWindowOperator<Integer, Integer, Integer> op =
+			AbstractAlignedProcessingTimeWindowOperator<Integer, Integer, Integer, ?> op =
 					new AccumulatingProcessingTimeWindowOperator<>(validatingIdentityFunction, identitySelector, 150, 50);
 
 			op.setup(out, mockContext);
@@ -438,7 +438,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 			
 			// the operator has a window time that is so long that it will not fire in this test
 			final long oneYear = 365L * 24 * 60 * 60 * 1000;
-			AbstractAlignedProcessingTimeWindowOperator<Integer, Integer, Integer> op = 
+			AbstractAlignedProcessingTimeWindowOperator<Integer, Integer, Integer, ?> op = 
 					new AccumulatingProcessingTimeWindowOperator<>(validatingIdentityFunction, identitySelector,
 							oneYear, oneYear);
 			
@@ -472,11 +472,11 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 			final StreamingRuntimeContext mockContext = mock(StreamingRuntimeContext.class);
 			when(mockContext.getTaskName()).thenReturn("Test task name");
 
-			KeyedWindowFunction<Integer, Integer, Integer, Window> failingFunction = new FailingFunction(100);
+			KeyedWindowFunction<Integer, Integer, Integer, TimeWindow> failingFunction = new FailingFunction(100);
 
 			// the operator has a window time that is so long that it will not fire in this test
 			final long hundredYears = 100L * 365 * 24 * 60 * 60 * 1000;
-			AbstractAlignedProcessingTimeWindowOperator<Integer, Integer, Integer> op =
+			AbstractAlignedProcessingTimeWindowOperator<Integer, Integer, Integer, ?> op =
 					new AccumulatingProcessingTimeWindowOperator<>(
 							failingFunction, identitySelector, hundredYears, hundredYears);
 
@@ -523,7 +523,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 
 	// ------------------------------------------------------------------------
 	
-	private static class FailingFunction implements KeyedWindowFunction<Integer, Integer, Integer, Window> {
+	private static class FailingFunction implements KeyedWindowFunction<Integer, Integer, Integer, TimeWindow> {
 
 		private final int failAfterElements;
 		
@@ -534,7 +534,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 		}
 
 		@Override
-		public void evaluate(Integer integer, Window window, Iterable<Integer> values, Collector<Integer> out) throws Exception {
+		public void evaluate(Integer integer, TimeWindow window, Iterable<Integer> values, Collector<Integer> out) throws Exception {
 			for (Integer i : values) {
 				out.collect(i);
 				numElements++;

http://git-wip-us.apache.org/repos/asf/flink/blob/6e0e67d2/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
index 7ad9dd4..fa90e4a 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
@@ -42,7 +42,6 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
@@ -51,7 +50,7 @@ import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
-@SuppressWarnings("serial")
+@SuppressWarnings({"serial", "SynchronizationOnLocalVariableOrMethodParameter"})
 public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 
 	@SuppressWarnings("unchecked")
@@ -113,7 +112,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 	@Test
 	public void testWindowSizeAndSlide() {
 		try {
-			AbstractAlignedProcessingTimeWindowOperator<String, String, String> op;
+			AggregatingProcessingTimeWindowOperator<String, String> op;
 			
 			op = new AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, 5000, 1000);
 			assertEquals(5000, op.getWindowSize());
@@ -153,8 +152,8 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 			
 			final StreamingRuntimeContext mockContext = mock(StreamingRuntimeContext.class);
 			when(mockContext.getTaskName()).thenReturn("Test task name");
-			
-			AbstractAlignedProcessingTimeWindowOperator<String, String, String> op;
+
+			AggregatingProcessingTimeWindowOperator<String, String> op;
 
 			op = new AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, 5000, 1000);
 			op.setup(mockOut, mockContext);
@@ -244,9 +243,9 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 			when(mockContext.getTaskName()).thenReturn("Test task name");
 
 			final Object lock = new Object();
-			doAnswer(new Answer() {
+			doAnswer(new Answer<Void>() {
 				@Override
-				public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+				public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
 					final Long timestamp = (Long) invocationOnMock.getArguments()[0];
 					final Triggerable target = (Triggerable) invocationOnMock.getArguments()[1];
 					timerService.schedule(
@@ -380,9 +379,9 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 			when(mockContext.getTaskName()).thenReturn("Test task name");
 
 			final Object lock = new Object();
-			doAnswer(new Answer() {
+			doAnswer(new Answer<Void>() {
 				@Override
-				public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+				public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
 					final Long timestamp = (Long) invocationOnMock.getArguments()[0];
 					final Triggerable target = (Triggerable) invocationOnMock.getArguments()[1];
 					timerService.schedule(