You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/10/02 16:51:01 UTC

[3/3] flink git commit: [hotfix] [streaming] Handle rich functions properly in aligned time windows

[hotfix] [streaming] Handle rich functions properly in aligned time windows


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6e0e67d2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6e0e67d2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6e0e67d2

Branch: refs/heads/master
Commit: 6e0e67d2e0d5180d6fba492e8ab9cc8fb18fdf68
Parents: 7390201
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Oct 1 20:44:14 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Oct 2 15:20:40 2015 +0200

----------------------------------------------------------------------
 .../api/datastream/KeyedWindowDataStream.java   |  5 ++-
 .../api/operators/AbstractStreamOperator.java   |  5 ++-
 .../operators/AbstractUdfStreamOperator.java    | 33 +++++++++++----
 ...ractAlignedProcessingTimeWindowOperator.java | 22 +++++++---
 ...ccumulatingProcessingTimeWindowOperator.java |  5 ++-
 ...AggregatingProcessingTimeWindowOperator.java |  2 +-
 .../streaming/runtime/tasks/StreamTask.java     |  6 +--
 .../runtime/operators/StreamTaskTimerTest.java  | 19 +++++++--
 ...AlignedProcessingTimeWindowOperatorTest.java | 44 ++++++++++----------
 ...AlignedProcessingTimeWindowOperatorTest.java | 17 ++++----
 10 files changed, 101 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6e0e67d2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java
index 302a645..9d05b8c 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedWindowDataStream.java
@@ -34,6 +34,7 @@ import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
 import org.apache.flink.streaming.api.windowing.evictors.Evictor;
 import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
 import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.streaming.runtime.operators.windowing.AccumulatingProcessingTimeWindowOperator;
 import org.apache.flink.streaming.runtime.operators.windowing.AggregatingProcessingTimeWindowOperator;
@@ -248,7 +249,7 @@ public class KeyedWindowDataStream<T, K, W extends Window> {
 			}
 			else if (function instanceof KeyedWindowFunction) {
 				@SuppressWarnings("unchecked")
-				KeyedWindowFunction<T, R, K, Window> wf = (KeyedWindowFunction<T, R, K, Window>) function;
+				KeyedWindowFunction<T, R, K, TimeWindow> wf = (KeyedWindowFunction<T, R, K, TimeWindow>) function;
 
 				OneInputStreamOperator<T, R> op = new AccumulatingProcessingTimeWindowOperator<>(
 						wf, input.getKeySelector(), windowLength, windowSlide);
@@ -273,7 +274,7 @@ public class KeyedWindowDataStream<T, K, W extends Window> {
 			}
 			else if (function instanceof KeyedWindowFunction) {
 				@SuppressWarnings("unchecked")
-				KeyedWindowFunction<T, R, K, Window> wf = (KeyedWindowFunction<T, R, K, Window>) function;
+				KeyedWindowFunction<T, R, K, TimeWindow> wf = (KeyedWindowFunction<T, R, K, TimeWindow>) function;
 
 				OneInputStreamOperator<T, R> op = new AccumulatingProcessingTimeWindowOperator<>(
 						wf, input.getKeySelector(), windowLength, windowSlide);

http://git-wip-us.apache.org/repos/asf/flink/blob/6e0e67d2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 07d8312..77bd130 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -24,7 +24,10 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
 
 /**
- * Base class for operators that do not contain a user-defined function.
+ * Base class for all stream operators.
+ * 
+ * Operators that contain a user function should extend the class 
+ * {@link AbstractUdfStreamOperator} instead (which is a specialized subclass of this class).
  * 
  * @param <OUT> The output type of the operator
  */

http://git-wip-us.apache.org/repos/asf/flink/blob/6e0e67d2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
index f679d5f..c0d71e9 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
@@ -22,6 +22,7 @@ import java.io.Serializable;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Objects;
 
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.functions.util.FunctionUtils;
@@ -41,29 +42,45 @@ import org.slf4j.LoggerFactory;
 
 /**
  * This is used as the base class for operators that have a user-defined
- * function.
+ * function. This class handles the opening and closing of the user-defined functions,
+ * as part of the operator life cycle.
  * 
  * @param <OUT>
  *            The output type of the operator
  * @param <F>
  *            The type of the user function
  */
-public abstract class AbstractUdfStreamOperator<OUT, F extends Function & Serializable> 
+public abstract class AbstractUdfStreamOperator<OUT, F extends Function> 
 		extends AbstractStreamOperator<OUT> implements StatefulStreamOperator<OUT> {
 
 	private static final long serialVersionUID = 1L;
 	
 	private static final Logger LOG = LoggerFactory.getLogger(AbstractUdfStreamOperator.class);
 	
-
+	
+	/** the user function */
 	protected final F userFunction;
 	
+	/** Flag to prevent duplicate function.close() calls in close() and dispose() */
 	private boolean functionsClosed = false;
 
+	
 	public AbstractUdfStreamOperator(F userFunction) {
-		this.userFunction = userFunction;
+		this.userFunction = Objects.requireNonNull(userFunction);
 	}
 
+	/**
+	 * Gets the user function executed in this operator.
+	 * @return The user function of this operator.
+	 */
+	public F getUserFunction() {
+		return userFunction;
+	}
+	
+	// ------------------------------------------------------------------------
+	//  operator life cycle
+	// ------------------------------------------------------------------------
+	
 	@Override
 	public final void setup(Output<StreamRecord<OUT>> output, StreamingRuntimeContext runtimeContext) {
 		super.setup(output, runtimeContext);
@@ -97,6 +114,10 @@ public abstract class AbstractUdfStreamOperator<OUT, F extends Function & Serial
 		}
 	}
 
+	// ------------------------------------------------------------------------
+	//  checkpointing and recovery
+	// ------------------------------------------------------------------------
+	
 	@Override
 	@SuppressWarnings({ "unchecked", "rawtypes" })
 	public void restoreInitialState(Tuple2<StateHandle<Serializable>, Map<String, OperatorStateHandle>> snapshots) throws Exception {
@@ -170,8 +191,4 @@ public abstract class AbstractUdfStreamOperator<OUT, F extends Function & Serial
 			}
 		}
 	}
-
-	public F getUserFunction() {
-		return userFunction;
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6e0e67d2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
index a81340f..4fcfb2c 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
@@ -24,7 +24,7 @@ import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.util.MathUtils;
-import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.TimestampedCollector;
 import org.apache.flink.streaming.api.watermark.Watermark;
@@ -33,7 +33,8 @@ import org.apache.flink.streaming.runtime.operators.Triggerable;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
 
-public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT> extends AbstractStreamOperator<OUT> 
+public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT, F extends Function> 
+		extends AbstractUdfStreamOperator<OUT, F> 
 		implements OneInputStreamOperator<IN, OUT>, Triggerable {
 	
 	private static final long serialVersionUID = 3245500864882459867L;
@@ -60,11 +61,13 @@ public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT>
 	private transient long nextSlideTime;
 	
 	protected AbstractAlignedProcessingTimeWindowOperator(
-			Function function,
+			F function,
 			KeySelector<IN, KEY> keySelector,
 			long windowLength,
 			long windowSlide)
 	{
+		super(function);
+		
 		if (function == null || keySelector == null) {
 			throw new NullPointerException();
 		}
@@ -103,6 +106,8 @@ public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT>
 
 	@Override
 	public void open(Configuration parameters) throws Exception {
+		super.open(parameters);
+		
 		out = new TimestampedCollector<>(output);
 		
 		// create the panes that gather the elements per slide
@@ -119,6 +124,8 @@ public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT>
 
 	@Override
 	public void close() throws Exception {
+		super.close();
+		
 		final long finalWindowTimestamp = nextEvaluationTime;
 
 		// early stop the triggering thread, so it does not attempt to return any more data
@@ -130,12 +137,17 @@ public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT>
 
 	@Override
 	public void dispose() {
+		super.dispose();
+		
 		// acquire the lock during shutdown, to prevent trigger calls at the same time
 		// fail-safe stop of the triggering thread (in case of an error)
 		stopTriggers();
 
-		// release the panes
-		panes.dispose();
+		// release the panes. panes may still be null if dispose is called
+		// after open() failed
+		if (panes != null) {
+			panes.dispose();
+		}
 	}
 	
 	private void stopTriggers() {

http://git-wip-us.apache.org/repos/asf/flink/blob/6e0e67d2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java
index 8edb76f..ace3823 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingProcessingTimeWindowOperator.java
@@ -21,17 +21,18 @@ package org.apache.flink.streaming.runtime.operators.windowing;
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.streaming.api.functions.windowing.KeyedWindowFunction;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 
 
 public class AccumulatingProcessingTimeWindowOperator<KEY, IN, OUT> 
-		extends AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT>  {
+		extends AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT, KeyedWindowFunction<IN, OUT, KEY, TimeWindow>>  {
 
 	private static final long serialVersionUID = 7305948082830843475L;
 
 	
 	public AccumulatingProcessingTimeWindowOperator(
-			KeyedWindowFunction<IN, OUT, KEY, Window> function,
+			KeyedWindowFunction<IN, OUT, KEY, TimeWindow> function,
 			KeySelector<IN, KEY> keySelector,
 			long windowLength,
 			long windowSlide)

http://git-wip-us.apache.org/repos/asf/flink/blob/6e0e67d2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingProcessingTimeWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingProcessingTimeWindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingProcessingTimeWindowOperator.java
index 99457bb..cc38019 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingProcessingTimeWindowOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingProcessingTimeWindowOperator.java
@@ -23,7 +23,7 @@ import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.java.functions.KeySelector;
 
 public class AggregatingProcessingTimeWindowOperator<KEY, IN> 
-		extends AbstractAlignedProcessingTimeWindowOperator<KEY, IN, IN>  {
+		extends AbstractAlignedProcessingTimeWindowOperator<KEY, IN, IN, ReduceFunction<IN>> {
 
 	private static final long serialVersionUID = 7305948082830843475L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6e0e67d2/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 1b35350..16b8f55 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -86,6 +86,9 @@ import org.slf4j.LoggerFactory;
  */
 public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends AbstractInvokable implements StatefulTask<StateHandle<Serializable>> {
 
+	/** The thread group that holds all trigger timer threads */
+	public static final ThreadGroup TRIGGER_THREAD_GROUP = new ThreadGroup("Triggers");
+	
 	private static final Logger LOG = LoggerFactory.getLogger(StreamTask.class);
 
 	/**
@@ -104,9 +107,6 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends Abs
 
 	protected ClassLoader userClassLoader;
 
-	/** The thread group that holds all trigger timer threads */
-	public static final ThreadGroup TRIGGER_THREAD_GROUP = new ThreadGroup("Triggers");
-
 	/** The executor service that */
 	private ScheduledExecutorService timerService;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6e0e67d2/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
index 2aed041..67df3ad 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.streaming.runtime.operators;
 
+import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.streaming.api.graph.StreamConfig;
@@ -25,8 +26,10 @@ import org.apache.flink.streaming.api.operators.StreamMap;
 import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
 import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
+
 import org.junit.Test;
 import org.junit.runner.RunWith;
+
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
@@ -38,7 +41,8 @@ import static org.junit.Assert.*;
  * Tests for the timer service of {@link org.apache.flink.streaming.runtime.tasks.StreamTask}.
  */
 @RunWith(PowerMockRunner.class)
-@PrepareForTest({ResultPartitionWriter.class})
+@PrepareForTest(ResultPartitionWriter.class)
+@SuppressWarnings("serial")
 public class StreamTaskTimerTest {
 
 	@Test
@@ -47,7 +51,8 @@ public class StreamTaskTimerTest {
 		final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<>(mapTask, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
 
 		StreamConfig streamConfig = testHarness.getStreamConfig();
-		StreamMap<String, String> mapOperator = new StreamMap<>(null);
+		
+		StreamMap<String, String> mapOperator = new StreamMap<>(new DummyMapFunction<String>());
 		streamConfig.setStreamOperator(mapOperator);
 
 		testHarness.invoke();
@@ -77,12 +82,11 @@ public class StreamTaskTimerTest {
 	@Test
 	public void checkScheduledTimestampe() {
 		try {
-
 			final OneInputStreamTask<String, String> mapTask = new OneInputStreamTask<>();
 			final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<>(mapTask, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
 
 			StreamConfig streamConfig = testHarness.getStreamConfig();
-			StreamMap<String, String> mapOperator = new StreamMap<>(null);
+			StreamMap<String, String> mapOperator = new StreamMap<>(new DummyMapFunction<String>());
 			streamConfig.setStreamOperator(mapOperator);
 
 			testHarness.invoke();
@@ -162,4 +166,11 @@ public class StreamTaskTimerTest {
 			}
 		}
 	}
+	
+	// ------------------------------------------------------------------------
+	
+	public static class DummyMapFunction<T> implements MapFunction<T, T> {
+		@Override
+		public T map(T value) { return value; }
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6e0e67d2/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
index 4327e11..99a2e14 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
@@ -22,13 +22,13 @@ import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.windowing.KeyedWindowFunction;
 import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.streaming.runtime.operators.Triggerable;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
 import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
-
 import org.apache.flink.util.Collector;
+
 import org.junit.After;
 import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
@@ -45,11 +45,11 @@ import java.util.concurrent.TimeUnit;
 import static org.mockito.Mockito.*;
 import static org.junit.Assert.*;
 
-@SuppressWarnings("serial")
+@SuppressWarnings({"serial", "SynchronizationOnLocalVariableOrMethodParameter"})
 public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 
 	@SuppressWarnings("unchecked")
-	private final KeyedWindowFunction<String, String, String, Window> mockFunction = mock(KeyedWindowFunction.class);
+	private final KeyedWindowFunction<String, String, String, TimeWindow> mockFunction = mock(KeyedWindowFunction.class);
 
 	@SuppressWarnings("unchecked")
 	private final KeySelector<String, String> mockKeySelector = mock(KeySelector.class);
@@ -61,11 +61,11 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 		}
 	};
 	
-	private final KeyedWindowFunction<Integer, Integer, Integer, Window> validatingIdentityFunction =
-			new KeyedWindowFunction<Integer, Integer, Integer, Window>()
+	private final KeyedWindowFunction<Integer, Integer, Integer, TimeWindow> validatingIdentityFunction =
+			new KeyedWindowFunction<Integer, Integer, Integer, TimeWindow>()
 	{
 		@Override
-		public void evaluate(Integer key, Window window, Iterable<Integer> values, Collector<Integer> out) {
+		public void evaluate(Integer key, TimeWindow window, Iterable<Integer> values, Collector<Integer> out) {
 			for (Integer val : values) {
 				assertEquals(key, val);
 				out.collect(val);
@@ -112,7 +112,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 	@Test
 	public void testWindowSizeAndSlide() {
 		try {
-			AbstractAlignedProcessingTimeWindowOperator<String, String, String> op;
+			AbstractAlignedProcessingTimeWindowOperator<String, String, String, ?> op;
 			
 			op = new AccumulatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, 5000, 1000);
 			assertEquals(5000, op.getWindowSize());
@@ -153,7 +153,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 			final StreamingRuntimeContext mockContext = mock(StreamingRuntimeContext.class);
 			when(mockContext.getTaskName()).thenReturn("Test task name");
 			
-			AbstractAlignedProcessingTimeWindowOperator<String, String, String> op;
+			AbstractAlignedProcessingTimeWindowOperator<String, String, String, ?> op;
 
 			op = new AccumulatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, 5000, 1000);
 			op.setup(mockOut, mockContext);
@@ -199,7 +199,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 			when(mockContext.getTaskName()).thenReturn("Test task name");
 
 			// tumbling window that triggers every 20 milliseconds
-			AbstractAlignedProcessingTimeWindowOperator<Integer, Integer, Integer> op =
+			AbstractAlignedProcessingTimeWindowOperator<Integer, Integer, Integer, ?> op =
 					new AccumulatingProcessingTimeWindowOperator<>(
 							validatingIdentityFunction, identitySelector, windowSize, windowSize);
 
@@ -240,7 +240,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 			when(mockContext.getTaskName()).thenReturn("Test task name");
 
 			// tumbling window that triggers every 20 milliseconds
-			AbstractAlignedProcessingTimeWindowOperator<Integer, Integer, Integer> op =
+			AbstractAlignedProcessingTimeWindowOperator<Integer, Integer, Integer, ?> op =
 					new AccumulatingProcessingTimeWindowOperator<>(validatingIdentityFunction, identitySelector, 150, 50);
 
 			op.setup(out, mockContext);
@@ -299,9 +299,9 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 
 			final Object lock = new Object();
 
-			doAnswer(new Answer() {
+			doAnswer(new Answer<Void>() {
 				@Override
-				public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+				public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
 					final Long timestamp = (Long) invocationOnMock.getArguments()[0];
 					final Triggerable target = (Triggerable) invocationOnMock.getArguments()[1];
 					timerService.schedule(
@@ -321,7 +321,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 			}).when(mockContext).registerTimer(anyLong(), any(Triggerable.class));
 
 			// tumbling window that triggers every 20 milliseconds
-			AbstractAlignedProcessingTimeWindowOperator<Integer, Integer, Integer> op =
+			AbstractAlignedProcessingTimeWindowOperator<Integer, Integer, Integer, ?> op =
 					new AccumulatingProcessingTimeWindowOperator<>(validatingIdentityFunction, identitySelector, 50, 50);
 
 			op.setup(out, mockContext);
@@ -374,9 +374,9 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 
 			final Object lock = new Object();
 
-			doAnswer(new Answer() {
+			doAnswer(new Answer<Void>() {
 				@Override
-				public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+				public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
 					final Long timestamp = (Long) invocationOnMock.getArguments()[0];
 					final Triggerable target = (Triggerable) invocationOnMock.getArguments()[1];
 					timerService.schedule(
@@ -396,7 +396,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 			}).when(mockContext).registerTimer(anyLong(), any(Triggerable.class));
 
 			// tumbling window that triggers every 20 milliseconds
-			AbstractAlignedProcessingTimeWindowOperator<Integer, Integer, Integer> op =
+			AbstractAlignedProcessingTimeWindowOperator<Integer, Integer, Integer, ?> op =
 					new AccumulatingProcessingTimeWindowOperator<>(validatingIdentityFunction, identitySelector, 150, 50);
 
 			op.setup(out, mockContext);
@@ -438,7 +438,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 			
 			// the operator has a window time that is so long that it will not fire in this test
 			final long oneYear = 365L * 24 * 60 * 60 * 1000;
-			AbstractAlignedProcessingTimeWindowOperator<Integer, Integer, Integer> op = 
+			AbstractAlignedProcessingTimeWindowOperator<Integer, Integer, Integer, ?> op = 
 					new AccumulatingProcessingTimeWindowOperator<>(validatingIdentityFunction, identitySelector,
 							oneYear, oneYear);
 			
@@ -472,11 +472,11 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 			final StreamingRuntimeContext mockContext = mock(StreamingRuntimeContext.class);
 			when(mockContext.getTaskName()).thenReturn("Test task name");
 
-			KeyedWindowFunction<Integer, Integer, Integer, Window> failingFunction = new FailingFunction(100);
+			KeyedWindowFunction<Integer, Integer, Integer, TimeWindow> failingFunction = new FailingFunction(100);
 
 			// the operator has a window time that is so long that it will not fire in this test
 			final long hundredYears = 100L * 365 * 24 * 60 * 60 * 1000;
-			AbstractAlignedProcessingTimeWindowOperator<Integer, Integer, Integer> op =
+			AbstractAlignedProcessingTimeWindowOperator<Integer, Integer, Integer, ?> op =
 					new AccumulatingProcessingTimeWindowOperator<>(
 							failingFunction, identitySelector, hundredYears, hundredYears);
 
@@ -523,7 +523,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 
 	// ------------------------------------------------------------------------
 	
-	private static class FailingFunction implements KeyedWindowFunction<Integer, Integer, Integer, Window> {
+	private static class FailingFunction implements KeyedWindowFunction<Integer, Integer, Integer, TimeWindow> {
 
 		private final int failAfterElements;
 		
@@ -534,7 +534,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 		}
 
 		@Override
-		public void evaluate(Integer integer, Window window, Iterable<Integer> values, Collector<Integer> out) throws Exception {
+		public void evaluate(Integer integer, TimeWindow window, Iterable<Integer> values, Collector<Integer> out) throws Exception {
 			for (Integer i : values) {
 				out.collect(i);
 				numElements++;

http://git-wip-us.apache.org/repos/asf/flink/blob/6e0e67d2/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
index 7ad9dd4..fa90e4a 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
@@ -42,7 +42,6 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
@@ -51,7 +50,7 @@ import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
-@SuppressWarnings("serial")
+@SuppressWarnings({"serial", "SynchronizationOnLocalVariableOrMethodParameter"})
 public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 
 	@SuppressWarnings("unchecked")
@@ -113,7 +112,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 	@Test
 	public void testWindowSizeAndSlide() {
 		try {
-			AbstractAlignedProcessingTimeWindowOperator<String, String, String> op;
+			AggregatingProcessingTimeWindowOperator<String, String> op;
 			
 			op = new AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, 5000, 1000);
 			assertEquals(5000, op.getWindowSize());
@@ -153,8 +152,8 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 			
 			final StreamingRuntimeContext mockContext = mock(StreamingRuntimeContext.class);
 			when(mockContext.getTaskName()).thenReturn("Test task name");
-			
-			AbstractAlignedProcessingTimeWindowOperator<String, String, String> op;
+
+			AggregatingProcessingTimeWindowOperator<String, String> op;
 
 			op = new AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, 5000, 1000);
 			op.setup(mockOut, mockContext);
@@ -244,9 +243,9 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 			when(mockContext.getTaskName()).thenReturn("Test task name");
 
 			final Object lock = new Object();
-			doAnswer(new Answer() {
+			doAnswer(new Answer<Void>() {
 				@Override
-				public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+				public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
 					final Long timestamp = (Long) invocationOnMock.getArguments()[0];
 					final Triggerable target = (Triggerable) invocationOnMock.getArguments()[1];
 					timerService.schedule(
@@ -380,9 +379,9 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 			when(mockContext.getTaskName()).thenReturn("Test task name");
 
 			final Object lock = new Object();
-			doAnswer(new Answer() {
+			doAnswer(new Answer<Void>() {
 				@Override
-				public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+				public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
 					final Long timestamp = (Long) invocationOnMock.getArguments()[0];
 					final Triggerable target = (Triggerable) invocationOnMock.getArguments()[1];
 					timerService.schedule(