You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/10/25 21:24:36 UTC

[1/3] flink git commit: [FLINK-2891] [streaming] Set keys for key/value state in window evaluation of fast-path windows.

Repository: flink
Updated Branches:
  refs/heads/release-0.10 ec1730bf5 -> 65fcd3abd


[FLINK-2891] [streaming] Set keys for key/value state in window evaluation of fast-path windows.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/65fcd3ab
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/65fcd3ab
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/65fcd3ab

Branch: refs/heads/release-0.10
Commit: 65fcd3abdd4d9ad78c12b95ac7ad0b1e2926609e
Parents: 8e4cb0a
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Oct 22 18:58:29 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Oct 25 19:12:28 2015 +0100

----------------------------------------------------------------------
 .../api/operators/AbstractStreamOperator.java   |   9 +
 ...ractAlignedProcessingTimeWindowOperator.java |   2 +-
 .../windowing/AbstractKeyedTimePanes.java       |   3 +-
 .../windowing/AccumulatingKeyedTimePanes.java   |  22 +-
 .../windowing/AggregatingKeyedTimePanes.java    |  13 +-
 ...AlignedProcessingTimeWindowOperatorTest.java | 116 ++++-
 ...AlignedProcessingTimeWindowOperatorTest.java | 420 +++++++++++++++----
 7 files changed, 485 insertions(+), 100 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/65fcd3ab/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 078679d..9074b7a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -343,6 +343,15 @@ public abstract class AbstractStreamOperator<OUT>
 			}
 		}
 	}
+
+	@SuppressWarnings({"unchecked", "rawtypes"})
+	public void setKeyContext(Object key) {
+		if (keyValueStates != null) {
+			for (KvState kv : keyValueStates) {
+				kv.setCurrentKey(key);
+			}
+		}
+	}
 	
 	// ------------------------------------------------------------------------
 	//  Context and chaining properties

http://git-wip-us.apache.org/repos/asf/flink/blob/65fcd3ab/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
index 3165f88..90d3d82 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
@@ -239,7 +239,7 @@ public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT,
 	private void computeWindow(long timestamp) throws Exception {
 		out.setTimestamp(timestamp);
 		panes.truncatePanes(numPanesPerWindow);
-		panes.evaluateWindow(out, new TimeWindow(timestamp, timestamp + windowSize));
+		panes.evaluateWindow(out, new TimeWindow(timestamp, timestamp + windowSize), this);
 	}
 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/65fcd3ab/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractKeyedTimePanes.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractKeyedTimePanes.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractKeyedTimePanes.java
index d1cea20..89ce47b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractKeyedTimePanes.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractKeyedTimePanes.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.runtime.operators.windowing;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.util.Collector;
 
@@ -47,7 +48,7 @@ public abstract class AbstractKeyedTimePanes<Type, Key, Aggregate, Result> {
 
 	public abstract void addElementToLatestPane(Type element) throws Exception;
 
-	public abstract void evaluateWindow(Collector<Result> out, TimeWindow window) throws Exception;
+	public abstract void evaluateWindow(Collector<Result> out, TimeWindow window, AbstractStreamOperator<Result> operator) throws Exception;
 	
 	
 	public void dispose() {

http://git-wip-us.apache.org/repos/asf/flink/blob/65fcd3ab/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java
index c854e6c..e15de8e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.runtime.operators.windowing;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.runtime.util.UnionIterator;
 import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.util.Collector;
@@ -57,16 +58,21 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> extends AbstractKeyed
 	}
 
 	@Override
-	public void evaluateWindow(Collector<Result> out, TimeWindow window) throws Exception {
+	public void evaluateWindow(Collector<Result> out, TimeWindow window, 
+								AbstractStreamOperator<Result> operator) throws Exception
+	{
 		if (previousPanes.isEmpty()) {
 			// optimized path for single pane case (tumbling window)
 			for (KeyMap.Entry<Key, ArrayList<Type>> entry : latestPane) {
+				Key key = entry.getKey();
+				operator.setKeyContext(key);
 				function.apply(entry.getKey(), window, entry.getValue(), out);
 			}
 		}
 		else {
 			// general code path for multi-pane case
-			WindowFunctionTraversal<Key, Type, Result> evaluator = new WindowFunctionTraversal<>(function, window, out);
+			WindowFunctionTraversal<Key, Type, Result> evaluator = new WindowFunctionTraversal<>(
+					function, window, out, operator);
 			traverseAllPanes(evaluator, evaluationPass);
 		}
 		
@@ -84,16 +90,21 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> extends AbstractKeyed
 		private final UnionIterator<Type> unionIterator;
 		
 		private final Collector<Result> out;
+
+		private final TimeWindow window;
+		
+		private final AbstractStreamOperator<Result> contextOperator;
 		
 		private Key currentKey;
+		
 
-		private TimeWindow window;
-
-		WindowFunctionTraversal(WindowFunction<Type, Result, Key, Window> function, TimeWindow window, Collector<Result> out) {
+		WindowFunctionTraversal(WindowFunction<Type, Result, Key, Window> function, TimeWindow window, 
+								Collector<Result> out, AbstractStreamOperator<Result> contextOperator) {
 			this.function = function;
 			this.out = out;
 			this.unionIterator = new UnionIterator<>();
 			this.window = window;
+			this.contextOperator = contextOperator;
 		}
 
 
@@ -110,6 +121,7 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> extends AbstractKeyed
 
 		@Override
 		public void keyDone() throws Exception {
+			contextOperator.setKeyContext(currentKey);
 			function.apply(currentKey, window, unionIterator, out);
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/65fcd3ab/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingKeyedTimePanes.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingKeyedTimePanes.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingKeyedTimePanes.java
index d395b2a..8599bc1 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingKeyedTimePanes.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingKeyedTimePanes.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.operators.windowing;
 
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.util.Collector;
 
@@ -50,7 +51,8 @@ public class AggregatingKeyedTimePanes<Type, Key> extends AbstractKeyedTimePanes
 	}
 
 	@Override
-	public void evaluateWindow(Collector<Type> out, TimeWindow window) throws Exception {
+	public void evaluateWindow(Collector<Type> out, TimeWindow window, 
+								AbstractStreamOperator<Type> operator) throws Exception {
 		if (previousPanes.isEmpty()) {
 			// optimized path for single pane case
 			for (KeyMap.Entry<Key, Type> entry : latestPane) {
@@ -59,7 +61,7 @@ public class AggregatingKeyedTimePanes<Type, Key> extends AbstractKeyedTimePanes
 		}
 		else {
 			// general code path for multi-pane case
-			AggregatingTraversal<Key, Type> evaluator = new AggregatingTraversal<>(reducer, out);
+			AggregatingTraversal<Key, Type> evaluator = new AggregatingTraversal<>(reducer, out, operator);
 			traverseAllPanes(evaluator, evaluationPass);
 		}
 		
@@ -76,16 +78,21 @@ public class AggregatingKeyedTimePanes<Type, Key> extends AbstractKeyedTimePanes
 		
 		private final Collector<Type> out;
 		
+		private final AbstractStreamOperator<Type> operator;
+		
 		private Type currentValue;
 
-		AggregatingTraversal(ReduceFunction<Type> function, Collector<Type> out) {
+		AggregatingTraversal(ReduceFunction<Type> function, Collector<Type> out,
+								AbstractStreamOperator<Type> operator) {
 			this.function = function;
 			this.out = out;
+			this.operator = operator;
 		}
 
 		@Override
 		public void startNewKey(Key key) {
 			currentValue = null;
+			operator.setKeyContext(key);
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/65fcd3ab/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
index ad3c838..62eb268 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
@@ -20,11 +20,15 @@ package org.apache.flink.streaming.runtime.operators.windowing;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.state.OperatorState;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.ClosureCleaner;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.Output;
@@ -48,7 +52,9 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -89,6 +95,13 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 
 	// ------------------------------------------------------------------------
 
+	public AccumulatingAlignedProcessingTimeWindowOperatorTest() {
+		ClosureCleaner.clean(identitySelector, false);
+		ClosureCleaner.clean(validatingIdentityFunction, false);
+	}
+	
+	// ------------------------------------------------------------------------
+
 	@After
 	public void checkNoTriggerThreadsRunning() {
 		// make sure that all the threads we trigger are shut down
@@ -544,6 +557,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 
 			// inject some elements
 			final int numElementsFirst = 700;
+			final int numElements = 1000;
 			for (int i = 0; i < numElementsFirst; i++) {
 				synchronized (lock) {
 					op.processElement(new StreamRecord<Integer>(i));
@@ -560,6 +574,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 				resultAtSnapshot = new ArrayList<>(out.getElements());
 				int afterSnapShot = out.getElements().size();
 				assertEquals("operator performed computation during snapshot", beforeSnapShot, afterSnapShot);
+				assertTrue(afterSnapShot <= numElementsFirst);
 			}
 
 			// inject some random elements, which should not show up in the state
@@ -584,7 +599,6 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 			op.open();
 
 			// inject some more elements
-			final int numElements = 1000;
 			for (int i = numElementsFirst; i < numElements; i++) {
 				synchronized (lock) {
 					op.processElement(new StreamRecord<Integer>(i));
@@ -725,6 +739,64 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 		}
 	}
 	
+	@Test
+	public void testKeyValueStateInWindowFunction() {
+		final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
+		try {
+			final CollectingOutput<Integer> out = new CollectingOutput<>(50);
+			final Object lock = new Object();
+			final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
+			
+			StatefulFunction.globalCounts.clear();
+			
+			// tumbling window that triggers every 20 milliseconds
+			AccumulatingProcessingTimeWindowOperator<Integer, Integer, Integer> op =
+					new AccumulatingProcessingTimeWindowOperator<>(
+							new StatefulFunction(), identitySelector,
+							IntSerializer.INSTANCE, IntSerializer.INSTANCE, 50, 50);
+
+			op.setup(mockTask, createTaskConfig(identitySelector, IntSerializer.INSTANCE), out);
+			op.open();
+
+			synchronized (lock) {
+				op.processElement(new StreamRecord<Integer>(1));
+				op.processElement(new StreamRecord<Integer>(2));
+			}
+			out.waitForNElements(2, 60000);
+
+			synchronized (lock) {
+				op.processElement(new StreamRecord<Integer>(1));
+				op.processElement(new StreamRecord<Integer>(2));
+				op.processElement(new StreamRecord<Integer>(1));
+				op.processElement(new StreamRecord<Integer>(1));
+				op.processElement(new StreamRecord<Integer>(2));
+				op.processElement(new StreamRecord<Integer>(2));
+			}
+			out.waitForNElements(8, 60000);
+
+			List<Integer> result = out.getElements();
+			assertEquals(8, result.size());
+
+			Collections.sort(result);
+			assertEquals(Arrays.asList(1, 1, 1, 1, 2, 2, 2, 2), result);
+
+			assertEquals(4, StatefulFunction.globalCounts.get(1).intValue());
+			assertEquals(4, StatefulFunction.globalCounts.get(2).intValue());
+			
+			synchronized (lock) {
+				op.close();
+			}
+			op.dispose();
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+		finally {
+			timerService.shutdown();
+		}
+	}
+	
 	// ------------------------------------------------------------------------
 	
 	private void assertInvalidParameter(long windowSize, long windowSlide) {
@@ -771,6 +843,41 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 		}
 	}
 
+	// ------------------------------------------------------------------------
+
+	private static class StatefulFunction extends RichWindowFunction<Integer, Integer, Integer, TimeWindow> {
+		
+		// we use a concurrent map here even though there is no concurrency, to
+		// get "volatile" style access to entries
+		static final Map<Integer, Integer> globalCounts = new ConcurrentHashMap<>();
+		
+		private OperatorState<Integer> state;
+
+		@Override
+		public void open(Configuration parameters) {
+			assertNotNull(getRuntimeContext());
+			state = getRuntimeContext().getKeyValueState("totalCount", Integer.class, 0);
+		}
+
+		@Override
+		public void apply(Integer key,
+						  TimeWindow window,
+						  Iterable<Integer> values,
+						  Collector<Integer> out) throws Exception {
+			for (Integer i : values) {
+				// we need to update this state before emitting elements. Else, the test's main
+				// thread will have received all output elements before the state is updated and
+				// the checks may fail
+				state.update(state.value() + 1);
+				globalCounts.put(key, state.value());
+				
+				out.collect(i);
+			}
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
 	private static StreamTask<?, ?> createMockTask() {
 		StreamTask<?, ?> task = mock(StreamTask.class);
 		when(task.getAccumulatorMap()).thenReturn(new HashMap<String, Accumulator<?, ?>>());
@@ -821,4 +928,11 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 
 		return mockTask;
 	}
+	
+	private static StreamConfig createTaskConfig(KeySelector<?, ?> partitioner, TypeSerializer<?> keySerializer) {
+		StreamConfig cfg = new StreamConfig(new Configuration());
+		cfg.setStatePartitioner(partitioner);
+		cfg.setStateKeySerializer(keySerializer);
+		return cfg;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/65fcd3ab/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
index 4bd260f..4d507fb 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
@@ -21,9 +21,16 @@ package org.apache.flink.streaming.runtime.operators.windowing;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.RichReduceFunction;
+import org.apache.flink.api.common.state.OperatorState;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.ClosureCleaner;
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.streaming.api.graph.StreamConfig;
@@ -33,8 +40,8 @@ import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.streaming.runtime.operators.Triggerable;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
-
 import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
+
 import org.junit.After;
 import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
@@ -44,15 +51,19 @@ import org.mockito.stubbing.OngoingStubbing;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
@@ -70,20 +81,41 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 	@SuppressWarnings("unchecked")
 	private final KeySelector<String, String> mockKeySelector = mock(KeySelector.class);
 	
-	private final KeySelector<Integer, Integer> identitySelector = new KeySelector<Integer, Integer>() {
+	private final KeySelector<Tuple2<Integer, Integer>, Integer> fieldOneSelector = 
+			new KeySelector<Tuple2<Integer,Integer>, Integer>() {
+				@Override
+				public Integer getKey(Tuple2<Integer,Integer> value) {
+					return value.f0;
+				}
+	};
+	
+	private final ReduceFunction<Tuple2<Integer, Integer>> sumFunction = new ReduceFunction<Tuple2<Integer, Integer>>() {
 		@Override
-		public Integer getKey(Integer value) {
-			return value;
+		public Tuple2<Integer, Integer> reduce(Tuple2<Integer, Integer> value1, Tuple2<Integer, Integer> value2) {
+			return new Tuple2<>(value1.f0, value1.f1 + value2.f1);
 		}
 	};
 	
-	private final ReduceFunction<Integer> sumFunction = new ReduceFunction<Integer>() {
+	private final TypeSerializer<Tuple2<Integer, Integer>> tupleSerializer = 
+			new TupleTypeInfo<Tuple2<Integer, Integer>>(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO)
+					.createSerializer(new ExecutionConfig());
+
+	private final Comparator<Tuple2<Integer, Integer>> tupleComparator = new Comparator<Tuple2<Integer, Integer>>() {
 		@Override
-		public Integer reduce(Integer value1, Integer value2) {
-			return value1 + value2;
+		public int compare(Tuple2<Integer, Integer> o1, Tuple2<Integer, Integer> o2) {
+			int diff0 = o1.f0 - o2.f0;
+			int diff1 = o1.f1 - o2.f1;
+			return diff0 != 0 ? diff0 : diff1;
 		}
 	};
+	
+	// ------------------------------------------------------------------------
 
+	public AggregatingAlignedProcessingTimeWindowOperatorTest() {
+		ClosureCleaner.clean(fieldOneSelector, false);
+		ClosureCleaner.clean(sumFunction, false);
+	}
+	
 	// ------------------------------------------------------------------------
 
 	@After
@@ -211,12 +243,12 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 		final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
 		try {
 			final int windowSize = 50;
-			final CollectingOutput<Integer> out = new CollectingOutput<>(windowSize);
+			final CollectingOutput<Tuple2<Integer, Integer>> out = new CollectingOutput<>(windowSize);
 			
-			AggregatingProcessingTimeWindowOperator<Integer, Integer> op =
+			AggregatingProcessingTimeWindowOperator<Integer, Tuple2<Integer, Integer>> op =
 					new AggregatingProcessingTimeWindowOperator<>(
-							sumFunction, identitySelector,
-							IntSerializer.INSTANCE, IntSerializer.INSTANCE,
+							sumFunction, fieldOneSelector,
+							IntSerializer.INSTANCE, tupleSerializer,
 							windowSize, windowSize);
 			
 			final Object lock = new Object();
@@ -229,7 +261,9 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 
 			for (int i = 0; i < numElements; i++) {
 				synchronized (lock) {
-					op.processElement(new StreamRecord<Integer>(i));
+					StreamRecord<Tuple2<Integer, Integer>> next = new StreamRecord<>(new Tuple2<>(i, i));
+					op.setKeyContextElement(next);
+					op.processElement(next);
 				}
 				Thread.sleep(1);
 			}
@@ -240,12 +274,13 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 			op.dispose();
 
 			// get and verify the result
-			List<Integer> result = out.getElements();
+			List<Tuple2<Integer, Integer>> result = out.getElements();
 			assertEquals(numElements, result.size());
 
-			Collections.sort(result);
+			Collections.sort(result, tupleComparator);
 			for (int i = 0; i < numElements; i++) {
-				assertEquals(i, result.get(i).intValue());
+				assertEquals(i, result.get(i).f0.intValue());
+				assertEquals(i, result.get(i).f1.intValue());
 			}
 		}
 		catch (Exception e) {
@@ -263,15 +298,15 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 
 		try {
 			final int windowSize = 50;
-			final CollectingOutput<Integer> out = new CollectingOutput<>(windowSize);
+			final CollectingOutput<Tuple2<Integer, Integer>> out = new CollectingOutput<>(windowSize);
 
 			final Object lock = new Object();
 			final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
-			
-			AggregatingProcessingTimeWindowOperator<Integer, Integer> op =
+
+			AggregatingProcessingTimeWindowOperator<Integer, Tuple2<Integer, Integer>> op =
 					new AggregatingProcessingTimeWindowOperator<>(
-							sumFunction, identitySelector,
-							IntSerializer.INSTANCE, IntSerializer.INSTANCE,
+							sumFunction, fieldOneSelector,
+							IntSerializer.INSTANCE, tupleSerializer,
 							windowSize, windowSize);
 			
 			op.setup(mockTask, new StreamConfig(new Configuration()), out);
@@ -286,8 +321,10 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 				synchronized (lock) {
 					long nextTime = op.getNextEvaluationTime();
 					int val = ((int) nextTime) ^ ((int) (nextTime >>> 32));
-					
-					op.processElement(new StreamRecord<Integer>(val));
+
+					StreamRecord<Tuple2<Integer, Integer>> next =  new StreamRecord<>(new Tuple2<>(val, val));
+					op.setKeyContextElement(next);
+					op.processElement(next);
 
 					if (nextTime != previousNextTime) {
 						window++;
@@ -302,14 +339,14 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 			}
 			op.dispose();
 			
-			List<Integer> result = out.getElements();
+			List<Tuple2<Integer, Integer>> result = out.getElements();
 			
 			// we have ideally one element per window. we may have more, when we emitted a value into the
 			// successive window (corner case), so we can have twice the number of elements, in the worst case.
 			assertTrue(result.size() >= numWindows && result.size() <= 2 * numWindows);
 
 			// deduplicate for more accurate checks
-			HashSet<Integer> set = new HashSet<>(result);
+			HashSet<Tuple2<Integer, Integer>> set = new HashSet<>(result);
 			assertTrue(set.size() == 10);
 		}
 		catch (Exception e) {
@@ -325,16 +362,16 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 	public void testSlidingWindow() {
 		final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
 		try {
-			final CollectingOutput<Integer> out = new CollectingOutput<>(50);
+			final CollectingOutput<Tuple2<Integer, Integer>> out = new CollectingOutput<>(50);
 
 			final Object lock = new Object();
 			final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
 
 			// tumbling window that triggers every 20 milliseconds
-			AggregatingProcessingTimeWindowOperator<Integer, Integer> op =
+			AggregatingProcessingTimeWindowOperator<Integer, Tuple2<Integer, Integer>> op =
 					new AggregatingProcessingTimeWindowOperator<>(
-							sumFunction, identitySelector,
-							IntSerializer.INSTANCE, IntSerializer.INSTANCE,
+							sumFunction, fieldOneSelector,
+							IntSerializer.INSTANCE, tupleSerializer,
 							150, 50);
 
 			op.setup(mockTask, new StreamConfig(new Configuration()), out);
@@ -344,7 +381,9 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 
 			for (int i = 0; i < numElements; i++) {
 				synchronized (lock) {
-					op.processElement(new StreamRecord<Integer>(i));
+					StreamRecord<Tuple2<Integer, Integer>> next = new StreamRecord<>(new Tuple2<>(i, i));
+					op.setKeyContextElement(next);
+					op.processElement(next);
 				}
 				Thread.sleep(1);
 			}
@@ -355,7 +394,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 			op.dispose();
 
 			// get and verify the result
-			List<Integer> result = out.getElements();
+			List<Tuple2<Integer, Integer>> result = out.getElements();
 			
 			// every element can occur between one and three times
 			if (result.size() < numElements || result.size() > 3 * numElements) {
@@ -363,17 +402,19 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 				fail("Wrong number of results: " + result.size());
 			}
 
-			Collections.sort(result);
+			Collections.sort(result, tupleComparator);
 			int lastNum = -1;
 			int lastCount = -1;
 			
-			for (int num : result) {
-				if (num == lastNum) {
+			for (Tuple2<Integer, Integer> val : result) {
+				assertEquals(val.f0, val.f1);
+				
+				if (val.f0 == lastNum) {
 					lastCount++;
 					assertTrue(lastCount <= 3);
 				}
 				else {
-					lastNum = num;
+					lastNum = val.f0;
 					lastCount = 1;
 				}
 			}
@@ -392,33 +433,45 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 		final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
 
 		try {
-			final CollectingOutput<Integer> out = new CollectingOutput<>(50);
+			final CollectingOutput<Tuple2<Integer, Integer>> out = new CollectingOutput<>(50);
 			final Object lock = new Object();
 			final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
 
 			// tumbling window that triggers every 20 milliseconds
-			AggregatingProcessingTimeWindowOperator<Integer, Integer> op =
+			AggregatingProcessingTimeWindowOperator<Integer, Tuple2<Integer, Integer>> op =
 					new AggregatingProcessingTimeWindowOperator<>(
-							sumFunction, identitySelector,
-							IntSerializer.INSTANCE, IntSerializer.INSTANCE, 150, 50);
+							sumFunction, fieldOneSelector,
+							IntSerializer.INSTANCE, tupleSerializer, 150, 50);
 
 			op.setup(mockTask, new StreamConfig(new Configuration()), out);
 			op.open();
 
 			synchronized (lock) {
-				op.processElement(new StreamRecord<Integer>(1));
-				op.processElement(new StreamRecord<Integer>(2));
+				StreamRecord<Tuple2<Integer, Integer>> next1 = new StreamRecord<>(new Tuple2<>(1, 1));
+				op.setKeyContextElement(next1);
+				op.processElement(next1);
+				
+				StreamRecord<Tuple2<Integer, Integer>> next2 = new StreamRecord<>(new Tuple2<>(2, 2));
+				op.setKeyContextElement(next2);
+				op.processElement(next2);
 			}
 
 			// each element should end up in the output three times
 			// wait until the elements have arrived 6 times in the output
 			out.waitForNElements(6, 120000);
 			
-			List<Integer> result = out.getElements();
+			List<Tuple2<Integer, Integer>> result = out.getElements();
 			assertEquals(6, result.size());
 			
-			Collections.sort(result);
-			assertEquals(Arrays.asList(1, 1, 1, 2, 2, 2), result);
+			Collections.sort(result, tupleComparator);
+			assertEquals(Arrays.asList(
+					new Tuple2<>(1, 1),
+					new Tuple2<>(1, 1),
+					new Tuple2<>(1, 1),
+					new Tuple2<>(2, 2),
+					new Tuple2<>(2, 2),
+					new Tuple2<>(2, 2)
+			), result);
 
 			synchronized (lock) {
 				op.close();
@@ -438,15 +491,16 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 	public void testEmitTrailingDataOnClose() {
 		final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
 		try {
-			final CollectingOutput<Integer> out = new CollectingOutput<>();
+			final CollectingOutput<Tuple2<Integer, Integer>> out = new CollectingOutput<>();
 			final Object lock = new Object();
 			final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
 			
 			// the operator has a window time that is so long that it will not fire in this test
 			final long oneYear = 365L * 24 * 60 * 60 * 1000;
-			AggregatingProcessingTimeWindowOperator<Integer, Integer> op = 
-					new AggregatingProcessingTimeWindowOperator<>(sumFunction, identitySelector,
-							IntSerializer.INSTANCE, IntSerializer.INSTANCE, oneYear, oneYear);
+			AggregatingProcessingTimeWindowOperator<Integer, Tuple2<Integer, Integer>> op =
+					new AggregatingProcessingTimeWindowOperator<>(
+							sumFunction, fieldOneSelector,
+							IntSerializer.INSTANCE, tupleSerializer, oneYear, oneYear);
 
 			op.setup(mockTask, new StreamConfig(new Configuration()), out);
 			op.open();
@@ -454,7 +508,9 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 			List<Integer> data = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
 			for (Integer i : data) {
 				synchronized (lock) {
-					op.processElement(new StreamRecord<Integer>(i));
+					StreamRecord<Tuple2<Integer, Integer>> next = new StreamRecord<>(new Tuple2<>(i, i));
+					op.setKeyContextElement(next);
+					op.processElement(next);
 				}
 			}
 
@@ -464,9 +520,14 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 			op.dispose();
 			
 			// get and verify the result
-			List<Integer> result = out.getElements();
-			Collections.sort(result);
-			assertEquals(data, result);
+			List<Tuple2<Integer, Integer>> result = out.getElements();
+			assertEquals(data.size(), result.size());
+			
+			Collections.sort(result, tupleComparator);
+			for (int i = 0; i < data.size(); i++) {
+				assertEquals(data.get(i), result.get(i).f0);
+				assertEquals(data.get(i), result.get(i).f1);
+			}
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -481,18 +542,18 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 	public void testPropagateExceptionsFromProcessElement() {
 		final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
 		try {
-			final CollectingOutput<Integer> out = new CollectingOutput<>();
+			final CollectingOutput<Tuple2<Integer, Integer>> out = new CollectingOutput<>();
 			final Object lock = new Object();
 			final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
 
-			ReduceFunction<Integer> failingFunction = new FailingFunction(100);
+			ReduceFunction<Tuple2<Integer, Integer>> failingFunction = new FailingFunction(100);
 
 			// the operator has a window time that is so long that it will not fire in this test
 			final long hundredYears = 100L * 365 * 24 * 60 * 60 * 1000;
-			AggregatingProcessingTimeWindowOperator<Integer, Integer> op =
+			AggregatingProcessingTimeWindowOperator<Integer, Tuple2<Integer, Integer>> op =
 					new AggregatingProcessingTimeWindowOperator<>(
-							failingFunction, identitySelector,
-							IntSerializer.INSTANCE, IntSerializer.INSTANCE,
+							failingFunction, fieldOneSelector,
+							IntSerializer.INSTANCE, tupleSerializer,
 							hundredYears, hundredYears);
 
 			op.setup(mockTask, new StreamConfig(new Configuration()), out);
@@ -500,12 +561,16 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 
 			for (int i = 0; i < 100; i++) {
 				synchronized (lock) {
-					op.processElement(new StreamRecord<Integer>(1));
+					StreamRecord<Tuple2<Integer, Integer>> next = new StreamRecord<>(new Tuple2<>(1, 1)); 
+					op.setKeyContextElement(next);
+					op.processElement(next);
 				}
 			}
 			
 			try {
-				op.processElement(new StreamRecord<Integer>(1));
+				StreamRecord<Tuple2<Integer, Integer>> next = new StreamRecord<>(new Tuple2<>(1, 1));
+				op.setKeyContextElement(next);
+				op.processElement(next);
 				fail("This fail with an exception");
 			}
 			catch (Exception e) {
@@ -528,15 +593,15 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 		final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
 		try {
 			final int windowSize = 200;
-			final CollectingOutput<Integer> out = new CollectingOutput<>(windowSize);
+			final CollectingOutput<Tuple2<Integer, Integer>> out = new CollectingOutput<>(windowSize);
 			final Object lock = new Object();
 			final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
 
 			// tumbling window that triggers every 50 milliseconds
-			AggregatingProcessingTimeWindowOperator<Integer, Integer> op =
+			AggregatingProcessingTimeWindowOperator<Integer, Tuple2<Integer, Integer>> op =
 					new AggregatingProcessingTimeWindowOperator<>(
-							sumFunction, identitySelector,
-							IntSerializer.INSTANCE, IntSerializer.INSTANCE,
+							sumFunction, fieldOneSelector,
+							IntSerializer.INSTANCE, tupleSerializer,
 							windowSize, windowSize);
 
 			op.setup(mockTask, new StreamConfig(new Configuration()), out);
@@ -548,14 +613,16 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 			
 			for (int i = 0; i < numElementsFirst; i++) {
 				synchronized (lock) {
-					op.processElement(new StreamRecord<Integer>(i));
+					StreamRecord<Tuple2<Integer, Integer>> next = new StreamRecord<>(new Tuple2<>(i, i));
+					op.setKeyContextElement(next);
+					op.processElement(next);
 				}
 				Thread.sleep(1);
 			}
 
 			// draw a snapshot and dispose the window
 			StreamTaskState state;
-			List<Integer> resultAtSnapshot;
+			List<Tuple2<Integer, Integer>> resultAtSnapshot;
 			synchronized (lock) {
 				int beforeSnapShot = out.getElements().size();
 				state = op.snapshotOperatorState(1L, System.currentTimeMillis());
@@ -569,7 +636,9 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 			// inject some random elements, which should not show up in the state
 			for (int i = numElementsFirst; i < numElements; i++) {
 				synchronized (lock) {
-					op.processElement(new StreamRecord<Integer>(i));
+					StreamRecord<Tuple2<Integer, Integer>> next = new StreamRecord<>(new Tuple2<>(i, i));
+					op.setKeyContextElement(next);
+					op.processElement(next);
 				}
 				Thread.sleep(1);
 			}
@@ -577,10 +646,10 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 			op.dispose();
 
 			// re-create the operator and restore the state
-			final CollectingOutput<Integer> out2 = new CollectingOutput<>(windowSize);
+			final CollectingOutput<Tuple2<Integer, Integer>> out2 = new CollectingOutput<>(windowSize);
 			op = new AggregatingProcessingTimeWindowOperator<>(
-					sumFunction, identitySelector,
-					IntSerializer.INSTANCE, IntSerializer.INSTANCE,
+					sumFunction, fieldOneSelector,
+					IntSerializer.INSTANCE, tupleSerializer,
 					windowSize, windowSize);
 
 			op.setup(mockTask, new StreamConfig(new Configuration()), out2);
@@ -590,7 +659,9 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 			// inject the remaining elements
 			for (int i = numElementsFirst; i < numElements; i++) {
 				synchronized (lock) {
-					op.processElement(new StreamRecord<Integer>(i));
+					StreamRecord<Tuple2<Integer, Integer>> next = new StreamRecord<>(new Tuple2<>(i, i));
+					op.setKeyContextElement(next);
+					op.processElement(next);
 				}
 				Thread.sleep(1);
 			}
@@ -601,13 +672,14 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 			op.dispose();
 
 			// get and verify the result
-			List<Integer> finalResult = new ArrayList<>(resultAtSnapshot);
+			List<Tuple2<Integer, Integer>> finalResult = new ArrayList<>(resultAtSnapshot);
 			finalResult.addAll(out2.getElements());
 			assertEquals(numElements, finalResult.size());
 
-			Collections.sort(finalResult);
+			Collections.sort(finalResult, tupleComparator);
 			for (int i = 0; i < numElements; i++) {
-				assertEquals(i, finalResult.get(i).intValue());
+				assertEquals(i, finalResult.get(i).f0.intValue());
+				assertEquals(i, finalResult.get(i).f1.intValue());
 			}
 		}
 		catch (Exception e) {
@@ -627,15 +699,15 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 			final int windowSlide = 50;
 			final int windowSize = factor * windowSlide;
 
-			final CollectingOutput<Integer> out = new CollectingOutput<>(windowSlide);
+			final CollectingOutput<Tuple2<Integer, Integer>> out = new CollectingOutput<>(windowSlide);
 			final Object lock = new Object();
 			final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
 
 			// sliding window (200 msecs) every 50 msecs
-			AggregatingProcessingTimeWindowOperator<Integer, Integer> op =
+			AggregatingProcessingTimeWindowOperator<Integer, Tuple2<Integer, Integer>> op =
 					new AggregatingProcessingTimeWindowOperator<>(
-							sumFunction, identitySelector,
-							IntSerializer.INSTANCE, IntSerializer.INSTANCE,
+							sumFunction, fieldOneSelector,
+							IntSerializer.INSTANCE, tupleSerializer,
 							windowSize, windowSlide);
 
 			op.setup(mockTask, new StreamConfig(new Configuration()), out);
@@ -647,14 +719,16 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 
 			for (int i = 0; i < numElementsFirst; i++) {
 				synchronized (lock) {
-					op.processElement(new StreamRecord<Integer>(i));
+					StreamRecord<Tuple2<Integer, Integer>> next = new StreamRecord<>(new Tuple2<>(i, i));
+					op.setKeyContextElement(next);
+					op.processElement(next);
 				}
 				Thread.sleep(1);
 			}
 
 			// draw a snapshot
 			StreamTaskState state;
-			List<Integer> resultAtSnapshot;
+			List<Tuple2<Integer, Integer>> resultAtSnapshot;
 			synchronized (lock) {
 				int beforeSnapShot = out.getElements().size();
 				state = op.snapshotOperatorState(1L, System.currentTimeMillis());
@@ -668,7 +742,9 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 			// inject the remaining elements - these should not influence the snapshot
 			for (int i = numElementsFirst; i < numElements; i++) {
 				synchronized (lock) {
-					op.processElement(new StreamRecord<Integer>(i));
+					StreamRecord<Tuple2<Integer, Integer>> next = new StreamRecord<>(new Tuple2<>(i, i));
+					op.setKeyContextElement(next);
+					op.processElement(next);
 				}
 				Thread.sleep(1);
 			}
@@ -676,10 +752,10 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 			op.dispose();
 
 			// re-create the operator and restore the state
-			final CollectingOutput<Integer> out2 = new CollectingOutput<>(windowSlide);
+			final CollectingOutput<Tuple2<Integer, Integer>> out2 = new CollectingOutput<>(windowSlide);
 			op = new AggregatingProcessingTimeWindowOperator<>(
-					sumFunction, identitySelector,
-					IntSerializer.INSTANCE, IntSerializer.INSTANCE,
+					sumFunction, fieldOneSelector,
+					IntSerializer.INSTANCE, tupleSerializer,
 					windowSize, windowSlide);
 
 			op.setup(mockTask, new StreamConfig(new Configuration()), out2);
@@ -690,7 +766,9 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 			// inject again the remaining elements
 			for (int i = numElementsFirst; i < numElements; i++) {
 				synchronized (lock) {
-					op.processElement(new StreamRecord<Integer>(i));
+					StreamRecord<Tuple2<Integer, Integer>> next = new StreamRecord<>(new Tuple2<>(i, i));
+					op.setKeyContextElement(next);
+					op.processElement(next);
 				}
 				Thread.sleep(1);
 			}
@@ -710,13 +788,14 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 			op.dispose();
 
 			// get and verify the result
-			List<Integer> finalResult = new ArrayList<>(resultAtSnapshot);
+			List<Tuple2<Integer, Integer>> finalResult = new ArrayList<>(resultAtSnapshot);
 			finalResult.addAll(out2.getElements());
 			assertEquals(factor * numElements, finalResult.size());
 
-			Collections.sort(finalResult);
+			Collections.sort(finalResult, tupleComparator);
 			for (int i = 0; i < factor * numElements; i++) {
-				assertEquals(i / factor, finalResult.get(i).intValue());
+				assertEquals(i / factor, finalResult.get(i).f0.intValue());
+				assertEquals(i / factor, finalResult.get(i).f1.intValue());
 			}
 		}
 		catch (Exception e) {
@@ -727,6 +806,134 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 			timerService.shutdown();
 		}
 	}
+
+	@Test
+	public void testKeyValueStateInWindowFunctionTumbling() {
+		final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
+		try {
+			final long hundredYears = 100L * 365 * 24 * 60 * 60 * 1000;
+			
+			final CollectingOutput<Tuple2<Integer, Integer>> out = new CollectingOutput<>();
+			final Object lock = new Object();
+			final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
+
+			StatefulFunction.globalCounts.clear();
+			
+			AggregatingProcessingTimeWindowOperator<Integer, Tuple2<Integer, Integer>> op =
+					new AggregatingProcessingTimeWindowOperator<>(
+							new StatefulFunction(), fieldOneSelector,
+							IntSerializer.INSTANCE, tupleSerializer, hundredYears, hundredYears);
+
+			op.setup(mockTask, createTaskConfig(fieldOneSelector, IntSerializer.INSTANCE), out);
+			op.open();
+
+			// because the window interval is so large, everything should be in one window
+			// and aggregate into one value per key
+			
+			synchronized (lock) {
+				for (int i = 0; i < 10; i++) {
+					StreamRecord<Tuple2<Integer, Integer>> next1 = new StreamRecord<>(new Tuple2<>(1, i));
+					op.setKeyContextElement(next1);
+					op.processElement(next1);
+	
+					StreamRecord<Tuple2<Integer, Integer>> next2 = new StreamRecord<>(new Tuple2<>(2, i));
+					op.setKeyContextElement(next2);
+					op.processElement(next2);
+				}
+
+				op.close();
+			}
+
+			List<Tuple2<Integer, Integer>> result = out.getElements();
+			assertEquals(2, result.size());
+
+			Collections.sort(result, tupleComparator);
+			assertEquals(45, result.get(0).f1.intValue());
+			assertEquals(45, result.get(1).f1.intValue());
+
+			assertEquals(10, StatefulFunction.globalCounts.get(1).intValue());
+			assertEquals(10, StatefulFunction.globalCounts.get(2).intValue());
+		
+			op.dispose();
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+		finally {
+			timerService.shutdown();
+		}
+	}
+
+	@Test
+	public void testKeyValueStateInWindowFunctionSliding() {
+		final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
+		try {
+			final int factor = 2;
+			final int windowSlide = 50;
+			final int windowSize = factor * windowSlide;
+			
+			final CollectingOutput<Tuple2<Integer, Integer>> out = new CollectingOutput<>();
+			final Object lock = new Object();
+			final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
+
+			StatefulFunction.globalCounts.clear();
+			
+			AggregatingProcessingTimeWindowOperator<Integer, Tuple2<Integer, Integer>> op =
+					new AggregatingProcessingTimeWindowOperator<>(
+							new StatefulFunction(), fieldOneSelector,
+							IntSerializer.INSTANCE, tupleSerializer, windowSize, windowSlide);
+
+			op.setup(mockTask, createTaskConfig(fieldOneSelector, IntSerializer.INSTANCE), out);
+			op.open();
+
+			// because the window interval is so large, everything should be in one window
+			// and aggregate into one value per key
+			final int numElements = 100;
+			
+			// because we do not release the lock here, these elements
+			for (int i = 0; i < numElements; i++) {
+				
+				StreamRecord<Tuple2<Integer, Integer>> next1 = new StreamRecord<>(new Tuple2<>(1, i));
+				StreamRecord<Tuple2<Integer, Integer>> next2 = new StreamRecord<>(new Tuple2<>(2, i));
+				StreamRecord<Tuple2<Integer, Integer>> next3 = new StreamRecord<>(new Tuple2<>(1, i));
+				StreamRecord<Tuple2<Integer, Integer>> next4 = new StreamRecord<>(new Tuple2<>(2, i));
+
+				// because we do not release the lock between elements, they end up in the same windows
+				synchronized (lock) {
+					op.setKeyContextElement(next1);
+					op.processElement(next1);
+					op.setKeyContextElement(next2);
+					op.processElement(next2);
+					op.setKeyContextElement(next3);
+					op.processElement(next3);
+					op.setKeyContextElement(next4);
+					op.processElement(next4);
+				}
+
+				Thread.sleep(1);
+			}
+			
+			synchronized (lock) {
+				op.close();
+			}
+
+			int count1 = StatefulFunction.globalCounts.get(1);
+			int count2 = StatefulFunction.globalCounts.get(2);
+			
+			assertTrue(count1 >= 2 && count1 <= 2 * numElements);
+			assertEquals(count1, count2);
+			
+			op.dispose();
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+		finally {
+			timerService.shutdown();
+		}
+	}
 	
 	// ------------------------------------------------------------------------
 	
@@ -748,7 +955,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 
 	// ------------------------------------------------------------------------
 	
-	private static class FailingFunction implements ReduceFunction<Integer> {
+	private static class FailingFunction implements ReduceFunction<Tuple2<Integer, Integer>> {
 
 		private final int failAfterElements;
 		
@@ -759,16 +966,44 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 		}
 
 		@Override
-		public Integer reduce(Integer value1, Integer value2) throws Exception {
+		public Tuple2<Integer, Integer> reduce(Tuple2<Integer, Integer> value1, Tuple2<Integer, Integer> value2) throws Exception {
 			numElements++;
 
 			if (numElements >= failAfterElements) {
 				throw new Exception("Artificial Test Exception");
 			}
 			
-			return value1 + value2;
+			return new Tuple2<>(value1.f0, value1.f1 + value2.f1);
 		}
 	}
+
+	// ------------------------------------------------------------------------
+
+	private static class StatefulFunction extends RichReduceFunction<Tuple2<Integer, Integer>> {
+
+		static final Map<Integer, Integer> globalCounts = new ConcurrentHashMap<>();
+
+		private OperatorState<Integer> state;
+
+		@Override
+		public void open(Configuration parameters) {
+			assertNotNull(getRuntimeContext());
+			
+			// start with one, so the final count is correct and we test that we do not
+			// initialize with 0 always by default
+			state = getRuntimeContext().getKeyValueState("totalCount", Integer.class, 1);
+		}
+
+		@Override
+		public Tuple2<Integer, Integer> reduce(Tuple2<Integer, Integer> value1, Tuple2<Integer, Integer> value2) throws Exception {
+			state.update(state.value() + 1);
+			globalCounts.put(value1.f0, state.value());
+			
+			return new Tuple2<>(value1.f0, value1.f1 + value2.f1);
+		}
+	}
+
+	// ------------------------------------------------------------------------
 	
 	private static StreamTask<?, ?> createMockTask() {
 		StreamTask<?, ?> task = mock(StreamTask.class);
@@ -820,4 +1055,11 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 		
 		return mockTask;
 	}
+
+	private static StreamConfig createTaskConfig(KeySelector<?, ?> partitioner, TypeSerializer<?> keySerializer) {
+		StreamConfig cfg = new StreamConfig(new Configuration());
+		cfg.setStatePartitioner(partitioner);
+		cfg.setStateKeySerializer(keySerializer);
+		return cfg;
+	}
 }


[3/3] flink git commit: [FLINK-2866] [runtime] Eagerly close FSDataInputStream in file state handle

Posted by se...@apache.org.
[FLINK-2866] [runtime] Eagerly close FSDataInputStream in file state handle

This closes #1282


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c2811ce9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c2811ce9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c2811ce9

Branch: refs/heads/release-0.10
Commit: c2811ce975548e18d64a5f3c2b1b397f9e42bc1c
Parents: ec1730b
Author: tedyu <yu...@gmail.com>
Authored: Wed Oct 21 19:40:21 2015 -0700
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Oct 25 19:12:28 2015 +0100

----------------------------------------------------------------------
 .../runtime/state/filesystem/FileSerializableStateHandle.java | 7 ++++---
 1 file changed, 4 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c2811ce9/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileSerializableStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileSerializableStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileSerializableStateHandle.java
index b7e7cd1..63336d1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileSerializableStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileSerializableStateHandle.java
@@ -46,8 +46,9 @@ public class FileSerializableStateHandle<T> extends AbstractFileState implements
 	@Override
 	@SuppressWarnings("unchecked")
 	public T getState(ClassLoader classLoader) throws Exception {
-		FSDataInputStream inStream = getFileSystem().open(getFilePath());
-		ObjectInputStream ois = new InstantiationUtil.ClassLoaderObjectInputStream(inStream, classLoader);
-		return (T) ois.readObject();
+		try (FSDataInputStream inStream = getFileSystem().open(getFilePath())) {
+			ObjectInputStream ois = new InstantiationUtil.ClassLoaderObjectInputStream(inStream, classLoader);
+			return (T) ois.readObject();
+		}
 	}
 }


[2/3] flink git commit: [FLINK-2888] [streaming] State backends return copies of the default values

Posted by se...@apache.org.
[FLINK-2888] [streaming] State backends return copies of the default values


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8e4cb0ae
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8e4cb0ae
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8e4cb0ae

Branch: refs/heads/release-0.10
Commit: 8e4cb0ae0bdc7f8c60542edabad57e4fa2f0c61e
Parents: c2811ce
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Oct 22 11:24:12 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Oct 25 19:12:28 2015 +0100

----------------------------------------------------------------------
 .../runtime/state/AbstractHeapKvState.java      |  3 +-
 .../runtime/state/FileStateBackendTest.java     | 40 ++++++++++++++++----
 .../runtime/state/MemoryStateBackendTest.java   | 31 ++++++++++++---
 3 files changed, 61 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8e4cb0ae/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractHeapKvState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractHeapKvState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractHeapKvState.java
index 12250b9..23703b3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractHeapKvState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractHeapKvState.java
@@ -88,7 +88,8 @@ public abstract class AbstractHeapKvState<K, V, Backend extends StateBackend<Bac
 	@Override
 	public V value() {
 		V value = state.get(currentKey);
-		return value != null ? value : defaultValue;
+		return value != null ? value : 
+				(defaultValue == null ? null : valueSerializer.copy(defaultValue));
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/8e4cb0ae/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java
index 481fb98..7182a36 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java
@@ -23,20 +23,16 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.FloatSerializer;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.IntValueSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.java.typeutils.runtime.ValueSerializer;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.testutils.CommonTestUtils;
-import org.apache.flink.runtime.state.KvState;
-import org.apache.flink.runtime.state.KvStateSnapshot;
-import org.apache.flink.runtime.state.StateBackend;
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.filesystem.FileStreamStateHandle;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.types.IntValue;
 import org.apache.flink.types.StringValue;
-import org.apache.flink.util.OperatingSystem;
 
 import org.junit.Test;
 
@@ -382,6 +378,36 @@ public class FileStateBackendTest {
 			deleteDirectorySilently(tempDir);
 		}
 	}
+
+	@Test
+	public void testCopyDefaultValue() {
+		File tempDir = new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString());
+		try {
+			FsStateBackend backend = CommonTestUtils.createCopySerializable(new FsStateBackend(localFileUri(tempDir)));
+			backend.initializeForJob(new JobID());
+			
+			KvState<Integer, IntValue, FsStateBackend> kv =
+					backend.createKvState(IntSerializer.INSTANCE, IntValueSerializer.INSTANCE, new IntValue(-1));
+
+			kv.setCurrentKey(1);
+			IntValue default1 = kv.value();
+
+			kv.setCurrentKey(2);
+			IntValue default2 = kv.value();
+
+			assertNotNull(default1);
+			assertNotNull(default2);
+			assertEquals(default1, default2);
+			assertFalse(default1 == default2);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+		finally {
+			deleteDirectorySilently(tempDir);
+		}
+	}
 	
 	// ------------------------------------------------------------------------
 	//  Utilities
@@ -411,7 +437,7 @@ public class FileStateBackendTest {
 	}
 	
 	private static String localFileUri(File path) {
-		return (OperatingSystem.isWindows() ? "file:/" : "file://") + path.getAbsolutePath();
+		return path.toURI().toString();
 	}
 	
 	private static void validateBytesInStream(InputStream is, byte[] data) throws IOException {

http://git-wip-us.apache.org/repos/asf/flink/blob/8e4cb0ae/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
index 5f95b33..87a050b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
@@ -21,14 +21,11 @@ package org.apache.flink.runtime.state;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.FloatSerializer;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.IntValueSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.java.typeutils.runtime.ValueSerializer;
-import org.apache.flink.runtime.state.KvState;
-import org.apache.flink.runtime.state.KvStateSnapshot;
-import org.apache.flink.runtime.state.StateBackend;
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.types.IntValue;
 import org.apache.flink.types.StringValue;
 import org.junit.Test;
 
@@ -279,4 +276,28 @@ public class MemoryStateBackendTest {
 			fail(e.getMessage());
 		}
 	}
+	
+	@Test
+	public void testCopyDefaultValue() {
+		try {
+			MemoryStateBackend backend = new MemoryStateBackend();
+			KvState<Integer, IntValue, MemoryStateBackend> kv =
+					backend.createKvState(IntSerializer.INSTANCE, IntValueSerializer.INSTANCE, new IntValue(-1));
+
+			kv.setCurrentKey(1);
+			IntValue default1 = kv.value();
+
+			kv.setCurrentKey(2);
+			IntValue default2 = kv.value();
+			
+			assertNotNull(default1);
+			assertNotNull(default2);
+			assertEquals(default1, default2);
+			assertFalse(default1 == default2);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
 }