You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/09/28 18:14:58 UTC

[02/12] flink git commit: Move window operators and tests to windowing package

http://git-wip-us.apache.org/repos/asf/flink/blob/05d2138f/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windows/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windows/AccumulatingAlignedProcessingTimeWindowOperatorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windows/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
deleted file mode 100644
index 19801f1..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windows/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
+++ /dev/null
@@ -1,547 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.operators.windows;
-
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.windows.KeyedWindowFunction;
-import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.streaming.runtime.operators.Triggerable;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.StreamTask;
-import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
-
-import org.apache.flink.util.Collector;
-import org.junit.After;
-import org.junit.Test;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import static org.mockito.Mockito.*;
-import static org.junit.Assert.*;
-
-@SuppressWarnings("serial")
-public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
-
-	@SuppressWarnings("unchecked")
-	private final KeyedWindowFunction<String, String, String> mockFunction = mock(KeyedWindowFunction.class);
-
-	@SuppressWarnings("unchecked")
-	private final KeySelector<String, String> mockKeySelector = mock(KeySelector.class);
-	
-	private final KeySelector<Integer, Integer> identitySelector = new KeySelector<Integer, Integer>() {
-		@Override
-		public Integer getKey(Integer value) {
-			return value;
-		}
-	};
-	
-	private final KeyedWindowFunction<Integer, Integer, Integer> validatingIdentityFunction = 
-			new KeyedWindowFunction<Integer, Integer, Integer>()
-	{
-		@Override
-		public void evaluate(Integer key, Iterable<Integer> values, Collector<Integer> out) {
-			for (Integer val : values) {
-				assertEquals(key, val);
-				out.collect(val);
-			}
-		}
-	};
-
-	// ------------------------------------------------------------------------
-
-	@After
-	public void checkNoTriggerThreadsRunning() {
-		// make sure that all the threads we trigger are shut down
-		long deadline = System.currentTimeMillis() + 5000;
-		while (StreamTask.TRIGGER_THREAD_GROUP.activeCount() > 0 && System.currentTimeMillis() < deadline) {
-			try {
-				Thread.sleep(10);
-			}
-			catch (InterruptedException ignored) {}
-		}
-
-		assertTrue("Not all trigger threads where properly shut down",
-				StreamTask.TRIGGER_THREAD_GROUP.activeCount() == 0);
-	}
-	
-	// ------------------------------------------------------------------------
-	
-	@Test
-	public void testInvalidParameters() {
-		try {
-			assertInvalidParameter(-1L, -1L);
-			assertInvalidParameter(10000L, -1L);
-			assertInvalidParameter(-1L, 1000L);
-			assertInvalidParameter(1000L, 2000L);
-			
-			// actual internal slide is too low here:
-			assertInvalidParameter(1000L, 999L);
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	@Test
-	public void testWindowSizeAndSlide() {
-		try {
-			AbstractAlignedProcessingTimeWindowOperator<String, String, String> op;
-			
-			op = new AccumulatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, 5000, 1000);
-			assertEquals(5000, op.getWindowSize());
-			assertEquals(1000, op.getWindowSlide());
-			assertEquals(1000, op.getPaneSize());
-			assertEquals(5, op.getNumPanesPerWindow());
-
-			op = new AccumulatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, 1000, 1000);
-			assertEquals(1000, op.getWindowSize());
-			assertEquals(1000, op.getWindowSlide());
-			assertEquals(1000, op.getPaneSize());
-			assertEquals(1, op.getNumPanesPerWindow());
-
-			op = new AccumulatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, 1500, 1000);
-			assertEquals(1500, op.getWindowSize());
-			assertEquals(1000, op.getWindowSlide());
-			assertEquals(500, op.getPaneSize());
-			assertEquals(3, op.getNumPanesPerWindow());
-
-			op = new AccumulatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, 1200, 1100);
-			assertEquals(1200, op.getWindowSize());
-			assertEquals(1100, op.getWindowSlide());
-			assertEquals(100, op.getPaneSize());
-			assertEquals(12, op.getNumPanesPerWindow());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	@Test
-	public void testWindowTriggerTimeAlignment() {
-		try {
-			@SuppressWarnings("unchecked")
-			final Output<StreamRecord<String>> mockOut = mock(Output.class);
-			
-			final StreamingRuntimeContext mockContext = mock(StreamingRuntimeContext.class);
-			when(mockContext.getTaskName()).thenReturn("Test task name");
-			
-			AbstractAlignedProcessingTimeWindowOperator<String, String, String> op;
-
-			op = new AccumulatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, 5000, 1000);
-			op.setup(mockOut, mockContext);
-			op.open(new Configuration());
-			assertTrue(op.getNextSlideTime() % 1000 == 0);
-			assertTrue(op.getNextEvaluationTime() % 1000 == 0);
-			op.dispose();
-
-			op = new AccumulatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, 1000, 1000);
-			op.setup(mockOut, mockContext);
-			op.open(new Configuration());
-			assertTrue(op.getNextSlideTime() % 1000 == 0);
-			assertTrue(op.getNextEvaluationTime() % 1000 == 0);
-			op.dispose();
-
-			op = new AccumulatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, 1500, 1000);
-			op.setup(mockOut, mockContext);
-			op.open(new Configuration());
-			assertTrue(op.getNextSlideTime() % 500 == 0);
-			assertTrue(op.getNextEvaluationTime() % 1000 == 0);
-			op.dispose();
-
-			op = new AccumulatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, 1200, 1100);
-			op.setup(mockOut, mockContext);
-			op.open(new Configuration());
-			assertTrue(op.getNextSlideTime() % 100 == 0);
-			assertTrue(op.getNextEvaluationTime() % 1100 == 0);
-			op.dispose();
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	@Test
-	public void testTumblingWindow() {
-		try {
-			final int windowSize = 50;
-			final CollectingOutput<Integer> out = new CollectingOutput<>(windowSize);
-
-			final StreamingRuntimeContext mockContext = mock(StreamingRuntimeContext.class);
-			when(mockContext.getTaskName()).thenReturn("Test task name");
-
-			// tumbling window that triggers every 20 milliseconds
-			AbstractAlignedProcessingTimeWindowOperator<Integer, Integer, Integer> op =
-					new AccumulatingProcessingTimeWindowOperator<>(
-							validatingIdentityFunction, identitySelector, windowSize, windowSize);
-
-			op.setup(out, mockContext);
-			op.open(new Configuration());
-
-			final int numElements = 1000;
-
-			for (int i = 0; i < numElements; i++) {
-				op.processElement(new StreamRecord<Integer>(i));
-				Thread.sleep(1);
-			}
-
-			op.close();
-			op.dispose();
-
-			// get and verify the result
-			List<Integer> result = out.getElements();
-			assertEquals(numElements, result.size());
-
-			Collections.sort(result);
-			for (int i = 0; i < numElements; i++) {
-				assertEquals(i, result.get(i).intValue());
-			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	@Test
-	public void testSlidingWindow() {
-		try {
-			final CollectingOutput<Integer> out = new CollectingOutput<>(50);
-
-			final StreamingRuntimeContext mockContext = mock(StreamingRuntimeContext.class);
-			when(mockContext.getTaskName()).thenReturn("Test task name");
-
-			// tumbling window that triggers every 20 milliseconds
-			AbstractAlignedProcessingTimeWindowOperator<Integer, Integer, Integer> op =
-					new AccumulatingProcessingTimeWindowOperator<>(validatingIdentityFunction, identitySelector, 150, 50);
-
-			op.setup(out, mockContext);
-			op.open(new Configuration());
-
-			final int numElements = 1000;
-
-			for (int i = 0; i < numElements; i++) {
-				op.processElement(new StreamRecord<Integer>(i));
-				Thread.sleep(1);
-			}
-
-			op.close();
-			op.dispose();
-
-			// get and verify the result
-			List<Integer> result = out.getElements();
-
-			// if we kept this running, each element would be in the result three times (for each slide).
-			// we are closing the window before the final panes are through three times, so we may have less
-			// elements.
-			if (result.size() < numElements || result.size() > 3 * numElements) {
-				fail("Wrong number of results: " + result.size());
-			}
-
-			Collections.sort(result);
-			int lastNum = -1;
-			int lastCount = -1;
-			
-			for (int num : result) {
-				if (num == lastNum) {
-					lastCount++;
-					assertTrue(lastCount <= 3);
-				}
-				else {
-					lastNum = num;
-					lastCount = 1;
-				}
-			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	@Test
-	public void testTumblingWindowSingleElements() {
-		final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
-
-		try {
-			final CollectingOutput<Integer> out = new CollectingOutput<>(50);
-
-			final StreamingRuntimeContext mockContext = mock(StreamingRuntimeContext.class);
-			when(mockContext.getTaskName()).thenReturn("Test task name");
-
-			final Object lock = new Object();
-
-			doAnswer(new Answer() {
-				@Override
-				public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
-					final Long timestamp = (Long) invocationOnMock.getArguments()[0];
-					final Triggerable target = (Triggerable) invocationOnMock.getArguments()[1];
-					timerService.schedule(
-							new Callable<Object>() {
-								@Override
-								public Object call() throws Exception {
-									synchronized (lock) {
-										target.trigger(timestamp);
-									}
-									return null;
-								}
-							},
-							timestamp - System.currentTimeMillis(),
-							TimeUnit.MILLISECONDS);
-					return null;
-				}
-			}).when(mockContext).registerTimer(anyLong(), any(Triggerable.class));
-
-			// tumbling window that triggers every 20 milliseconds
-			AbstractAlignedProcessingTimeWindowOperator<Integer, Integer, Integer> op =
-					new AccumulatingProcessingTimeWindowOperator<>(validatingIdentityFunction, identitySelector, 50, 50);
-
-			op.setup(out, mockContext);
-			op.open(new Configuration());
-
-			synchronized (lock) {
-				op.processElement(new StreamRecord<Integer>(1));
-				op.processElement(new StreamRecord<Integer>(2));
-			}
-			out.waitForNElements(2, 60000);
-
-			synchronized (lock) {
-				op.processElement(new StreamRecord<Integer>(3));
-				op.processElement(new StreamRecord<Integer>(4));
-				op.processElement(new StreamRecord<Integer>(5));
-			}
-			out.waitForNElements(5, 60000);
-
-			synchronized (lock) {
-				op.processElement(new StreamRecord<Integer>(6));
-			}
-			out.waitForNElements(6, 60000);
-			
-			List<Integer> result = out.getElements();
-			assertEquals(6, result.size());
-
-			Collections.sort(result);
-			assertEquals(Arrays.asList(1, 2, 3, 4, 5, 6), result);
-
-			op.close();
-			op.dispose();
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		} finally {
-			timerService.shutdown();
-		}
-	}
-	
-	@Test
-	public void testSlidingWindowSingleElements() {
-		final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
-
-		try {
-			final CollectingOutput<Integer> out = new CollectingOutput<>(50);
-
-			final StreamingRuntimeContext mockContext = mock(StreamingRuntimeContext.class);
-			when(mockContext.getTaskName()).thenReturn("Test task name");
-
-			final Object lock = new Object();
-
-			doAnswer(new Answer() {
-				@Override
-				public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
-					final Long timestamp = (Long) invocationOnMock.getArguments()[0];
-					final Triggerable target = (Triggerable) invocationOnMock.getArguments()[1];
-					timerService.schedule(
-							new Callable<Object>() {
-								@Override
-								public Object call() throws Exception {
-									synchronized (lock) {
-										target.trigger(timestamp);
-									}
-									return null;
-								}
-							},
-							timestamp - System.currentTimeMillis(),
-							TimeUnit.MILLISECONDS);
-					return null;
-				}
-			}).when(mockContext).registerTimer(anyLong(), any(Triggerable.class));
-
-			// tumbling window that triggers every 20 milliseconds
-			AbstractAlignedProcessingTimeWindowOperator<Integer, Integer, Integer> op =
-					new AccumulatingProcessingTimeWindowOperator<>(validatingIdentityFunction, identitySelector, 150, 50);
-
-			op.setup(out, mockContext);
-			op.open(new Configuration());
-
-			synchronized (lock) {
-				op.processElement(new StreamRecord<Integer>(1));
-				op.processElement(new StreamRecord<Integer>(2));
-			}
-
-			// each element should end up in the output three times
-			// wait until the elements have arrived 6 times in the output
-			out.waitForNElements(6, 120000);
-			
-			List<Integer> result = out.getElements();
-			assertEquals(6, result.size());
-			
-			Collections.sort(result);
-			assertEquals(Arrays.asList(1, 1, 1, 2, 2, 2), result);
-			
-			op.close();
-			op.dispose();
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		} finally {
-			timerService.shutdown();
-		}
-	}
-	
-	@Test
-	public void testEmitTrailingDataOnClose() {
-		try {
-			final CollectingOutput<Integer> out = new CollectingOutput<>();
-
-			final StreamingRuntimeContext mockContext = mock(StreamingRuntimeContext.class);
-			when(mockContext.getTaskName()).thenReturn("Test task name");
-			
-			// the operator has a window time that is so long that it will not fire in this test
-			final long oneYear = 365L * 24 * 60 * 60 * 1000;
-			AbstractAlignedProcessingTimeWindowOperator<Integer, Integer, Integer> op = 
-					new AccumulatingProcessingTimeWindowOperator<>(validatingIdentityFunction, identitySelector,
-							oneYear, oneYear);
-			
-			op.setup(out, mockContext);
-			op.open(new Configuration());
-			
-			List<Integer> data = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
-			for (Integer i : data) {
-				op.processElement(new StreamRecord<Integer>(i));
-			}
-			
-			op.close();
-			op.dispose();
-			
-			// get and verify the result
-			List<Integer> result = out.getElements();
-			Collections.sort(result);
-			assertEquals(data, result);
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	@Test
-	public void testPropagateExceptionsFromClose() {
-		try {
-			final CollectingOutput<Integer> out = new CollectingOutput<>();
-
-			final StreamingRuntimeContext mockContext = mock(StreamingRuntimeContext.class);
-			when(mockContext.getTaskName()).thenReturn("Test task name");
-
-			KeyedWindowFunction<Integer, Integer, Integer> failingFunction = new FailingFunction(100);
-
-			// the operator has a window time that is so long that it will not fire in this test
-			final long hundredYears = 100L * 365 * 24 * 60 * 60 * 1000;
-			AbstractAlignedProcessingTimeWindowOperator<Integer, Integer, Integer> op =
-					new AccumulatingProcessingTimeWindowOperator<>(
-							failingFunction, identitySelector, hundredYears, hundredYears);
-
-			op.setup(out, mockContext);
-			op.open(new Configuration());
-
-			for (int i = 0; i < 150; i++) {
-				op.processElement(new StreamRecord<Integer>(i));
-			}
-			
-			try {
-				op.close();
-				fail("This should fail with an exception");
-			}
-			catch (Exception e) {
-				assertTrue(
-						e.getMessage().contains("Artificial Test Exception") ||
-						(e.getCause() != null && e.getCause().getMessage().contains("Artificial Test Exception")));
-			}
-
-			op.dispose();
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	// ------------------------------------------------------------------------
-	
-	private void assertInvalidParameter(long windowSize, long windowSlide) {
-		try {
-			new AccumulatingProcessingTimeWindowOperator<String, String, String>(
-					mockFunction, mockKeySelector, windowSize, windowSlide);
-			fail("This should fail with an IllegalArgumentException");
-		}
-		catch (IllegalArgumentException e) {
-			// expected
-		}
-		catch (Exception e) {
-			fail("Wrong exception. Expected IllegalArgumentException but found " + e.getClass().getSimpleName());
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	
-	private static class FailingFunction implements KeyedWindowFunction<Integer, Integer, Integer> {
-
-		private final int failAfterElements;
-		
-		private int numElements;
-
-		FailingFunction(int failAfterElements) {
-			this.failAfterElements = failAfterElements;
-		}
-
-		@Override
-		public void evaluate(Integer integer, Iterable<Integer> values, Collector<Integer> out) throws Exception {
-			for (Integer i : values) {
-				out.collect(i);
-				numElements++;
-				
-				if (numElements >= failAfterElements) {
-					throw new Exception("Artificial Test Exception");
-				}
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/05d2138f/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windows/AggregatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windows/AggregatingAlignedProcessingTimeWindowOperatorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windows/AggregatingAlignedProcessingTimeWindowOperatorTest.java
deleted file mode 100644
index 0ff974c..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windows/AggregatingAlignedProcessingTimeWindowOperatorTest.java
+++ /dev/null
@@ -1,551 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.operators.windows;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.streaming.runtime.operators.Triggerable;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.StreamTask;
-import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
-
-import org.junit.After;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyLong;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-@SuppressWarnings("serial")
-public class AggregatingAlignedProcessingTimeWindowOperatorTest {
-
-	@SuppressWarnings("unchecked")
-	private final ReduceFunction<String> mockFunction = mock(ReduceFunction.class);
-
-	@SuppressWarnings("unchecked")
-	private final KeySelector<String, String> mockKeySelector = mock(KeySelector.class);
-	
-	private final KeySelector<Integer, Integer> identitySelector = new KeySelector<Integer, Integer>() {
-		@Override
-		public Integer getKey(Integer value) {
-			return value;
-		}
-	};
-	
-	private final ReduceFunction<Integer> sumFunction = new ReduceFunction<Integer>() {
-		@Override
-		public Integer reduce(Integer value1, Integer value2) {
-			return value1 + value2;
-		}
-	};
-
-	// ------------------------------------------------------------------------
-
-	@After
-	public void checkNoTriggerThreadsRunning() {
-		// make sure that all the threads we trigger are shut down
-		long deadline = System.currentTimeMillis() + 5000;
-		while (StreamTask.TRIGGER_THREAD_GROUP.activeCount() > 0 && System.currentTimeMillis() < deadline) {
-			try {
-				Thread.sleep(10);
-			}
-			catch (InterruptedException ignored) {}
-		}
-
-		assertTrue("Not all trigger threads where properly shut down",
-				StreamTask.TRIGGER_THREAD_GROUP.activeCount() == 0);
-	}
-	
-	// ------------------------------------------------------------------------
-	
-	@Test
-	public void testInvalidParameters() {
-		try {
-			assertInvalidParameter(-1L, -1L);
-			assertInvalidParameter(10000L, -1L);
-			assertInvalidParameter(-1L, 1000L);
-			assertInvalidParameter(1000L, 2000L);
-			
-			// actual internal slide is too low here:
-			assertInvalidParameter(1000L, 999L);
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	@Test
-	public void testWindowSizeAndSlide() {
-		try {
-			AbstractAlignedProcessingTimeWindowOperator<String, String, String> op;
-			
-			op = new AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, 5000, 1000);
-			assertEquals(5000, op.getWindowSize());
-			assertEquals(1000, op.getWindowSlide());
-			assertEquals(1000, op.getPaneSize());
-			assertEquals(5, op.getNumPanesPerWindow());
-
-			op = new AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, 1000, 1000);
-			assertEquals(1000, op.getWindowSize());
-			assertEquals(1000, op.getWindowSlide());
-			assertEquals(1000, op.getPaneSize());
-			assertEquals(1, op.getNumPanesPerWindow());
-
-			op = new AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, 1500, 1000);
-			assertEquals(1500, op.getWindowSize());
-			assertEquals(1000, op.getWindowSlide());
-			assertEquals(500, op.getPaneSize());
-			assertEquals(3, op.getNumPanesPerWindow());
-
-			op = new AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, 1200, 1100);
-			assertEquals(1200, op.getWindowSize());
-			assertEquals(1100, op.getWindowSlide());
-			assertEquals(100, op.getPaneSize());
-			assertEquals(12, op.getNumPanesPerWindow());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	@Test
-	public void testWindowTriggerTimeAlignment() {
-		try {
-			@SuppressWarnings("unchecked")
-			final Output<StreamRecord<String>> mockOut = mock(Output.class);
-			
-			final StreamingRuntimeContext mockContext = mock(StreamingRuntimeContext.class);
-			when(mockContext.getTaskName()).thenReturn("Test task name");
-			
-			AbstractAlignedProcessingTimeWindowOperator<String, String, String> op;
-
-			op = new AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, 5000, 1000);
-			op.setup(mockOut, mockContext);
-			op.open(new Configuration());
-			assertTrue(op.getNextSlideTime() % 1000 == 0);
-			assertTrue(op.getNextEvaluationTime() % 1000 == 0);
-			op.dispose();
-
-			op = new AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, 1000, 1000);
-			op.setup(mockOut, mockContext);
-			op.open(new Configuration());
-			assertTrue(op.getNextSlideTime() % 1000 == 0);
-			assertTrue(op.getNextEvaluationTime() % 1000 == 0);
-			op.dispose();
-
-			op = new AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, 1500, 1000);
-			op.setup(mockOut, mockContext);
-			op.open(new Configuration());
-			assertTrue(op.getNextSlideTime() % 500 == 0);
-			assertTrue(op.getNextEvaluationTime() % 1000 == 0);
-			op.dispose();
-
-			op = new AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, 1200, 1100);
-			op.setup(mockOut, mockContext);
-			op.open(new Configuration());
-			assertTrue(op.getNextSlideTime() % 100 == 0);
-			assertTrue(op.getNextEvaluationTime() % 1100 == 0);
-			op.dispose();
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	@Test
-	public void testTumblingWindowUniqueElements() {
-		try {
-			final int windowSize = 50;
-			final CollectingOutput<Integer> out = new CollectingOutput<>(windowSize);
-
-			final StreamingRuntimeContext mockContext = mock(StreamingRuntimeContext.class);
-			when(mockContext.getTaskName()).thenReturn("Test task name");
-			
-			AggregatingProcessingTimeWindowOperator<Integer, Integer> op =
-					new AggregatingProcessingTimeWindowOperator<>(
-							sumFunction, identitySelector, windowSize, windowSize);
-
-			op.setup(out, mockContext);
-			op.open(new Configuration());
-
-			final int numElements = 1000;
-
-			for (int i = 0; i < numElements; i++) {
-				op.processElement(new StreamRecord<Integer>(i));
-				Thread.sleep(1);
-			}
-
-			op.close();
-			op.dispose();
-
-			// get and verify the result
-			List<Integer> result = out.getElements();
-			assertEquals(numElements, result.size());
-
-			Collections.sort(result);
-			for (int i = 0; i < numElements; i++) {
-				assertEquals(i, result.get(i).intValue());
-			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	@Test
-	public void  testTumblingWindowDuplicateElements() {
-
-		final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
-
-		try {
-			final int windowSize = 50;
-			final CollectingOutput<Integer> out = new CollectingOutput<>(windowSize);
-
-			final StreamingRuntimeContext mockContext = mock(StreamingRuntimeContext.class);
-			when(mockContext.getTaskName()).thenReturn("Test task name");
-
-			final Object lock = new Object();
-			doAnswer(new Answer() {
-				@Override
-				public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
-					final Long timestamp = (Long) invocationOnMock.getArguments()[0];
-					final Triggerable target = (Triggerable) invocationOnMock.getArguments()[1];
-					timerService.schedule(
-							new Callable<Object>() {
-								@Override
-								public Object call() throws Exception {
-									synchronized (lock) {
-										target.trigger(timestamp);
-									}
-									return null;
-								}
-							},
-							timestamp - System.currentTimeMillis(),
-							TimeUnit.MILLISECONDS);
-					return null;
-				}
-			}).when(mockContext).registerTimer(anyLong(), any(Triggerable.class));
-			
-			AggregatingProcessingTimeWindowOperator<Integer, Integer> op =
-					new AggregatingProcessingTimeWindowOperator<>(
-							sumFunction, identitySelector, windowSize, windowSize);
-
-			op.setup(out, mockContext);
-			op.open(new Configuration());
-
-			final int numWindows = 10;
-
-			long previousNextTime = 0;
-			int window = 1;
-			
-			while (window <= numWindows) {
-				long nextTime = op.getNextEvaluationTime();
-				int val = ((int) nextTime) ^ ((int) (nextTime >>> 32));
-
-				synchronized (lock) {
-					op.processElement(new StreamRecord<Integer>(val));
-				}
-				
-				if (nextTime != previousNextTime) {
-					window++;
-					previousNextTime = nextTime;
-				}
-				
-				Thread.sleep(1);
-			}
-
-			op.close();
-			op.dispose();
-			
-			List<Integer> result = out.getElements();
-			
-			// we have ideally one element per window. we may have more, when we emitted a value into the
-			// successive window (corner case), so we can have twice the number of elements, in the worst case.
-			assertTrue(result.size() >= numWindows && result.size() <= 2 * numWindows);
-
-			// deduplicate for more accurate checks
-			HashSet<Integer> set = new HashSet<>(result);
-			assertTrue(set.size() == 10 || set.size() == 11);
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		} finally {
-			timerService.shutdown();
-		}
-	}
-
-	@Test
-	public void testSlidingWindow() {
-		try {
-			final CollectingOutput<Integer> out = new CollectingOutput<>(50);
-
-			final StreamingRuntimeContext mockContext = mock(StreamingRuntimeContext.class);
-			when(mockContext.getTaskName()).thenReturn("Test task name");
-
-			// tumbling window that triggers every 20 milliseconds
-			AggregatingProcessingTimeWindowOperator<Integer, Integer> op =
-					new AggregatingProcessingTimeWindowOperator<>(sumFunction, identitySelector, 150, 50);
-
-			op.setup(out, mockContext);
-			op.open(new Configuration());
-
-			final int numElements = 1000;
-
-			for (int i = 0; i < numElements; i++) {
-				op.processElement(new StreamRecord<Integer>(i));
-				Thread.sleep(1);
-			}
-
-			op.close();
-			op.dispose();
-
-			// get and verify the result
-			List<Integer> result = out.getElements();
-			
-			// every element can occur between one and three times
-			if (result.size() < numElements || result.size() > 3 * numElements) {
-				System.out.println(result);
-				fail("Wrong number of results: " + result.size());
-			}
-
-			Collections.sort(result);
-			int lastNum = -1;
-			int lastCount = -1;
-			
-			for (int num : result) {
-				if (num == lastNum) {
-					lastCount++;
-					assertTrue(lastCount <= 3);
-				}
-				else {
-					lastNum = num;
-					lastCount = 1;
-				}
-			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	@Test
-	public void testSlidingWindowSingleElements() {
-		final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
-
-		try {
-			final CollectingOutput<Integer> out = new CollectingOutput<>(50);
-
-			final StreamingRuntimeContext mockContext = mock(StreamingRuntimeContext.class);
-			when(mockContext.getTaskName()).thenReturn("Test task name");
-
-			final Object lock = new Object();
-			doAnswer(new Answer() {
-				@Override
-				public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
-					final Long timestamp = (Long) invocationOnMock.getArguments()[0];
-					final Triggerable target = (Triggerable) invocationOnMock.getArguments()[1];
-					timerService.schedule(
-							new Callable<Object>() {
-								@Override
-								public Object call() throws Exception {
-									synchronized (lock) {
-										target.trigger(timestamp);
-									}
-									return null;
-								}
-							},
-							timestamp - System.currentTimeMillis(),
-							TimeUnit.MILLISECONDS);
-					return null;
-				}
-			}).when(mockContext).registerTimer(anyLong(), any(Triggerable.class));
-
-			// tumbling window that triggers every 20 milliseconds
-			AggregatingProcessingTimeWindowOperator<Integer, Integer> op =
-					new AggregatingProcessingTimeWindowOperator<>(sumFunction, identitySelector, 150, 50);
-
-			op.setup(out, mockContext);
-			op.open(new Configuration());
-
-			synchronized (lock) {
-				op.processElement(new StreamRecord<Integer>(1));
-				op.processElement(new StreamRecord<Integer>(2));
-			}
-
-			// each element should end up in the output three times
-			// wait until the elements have arrived 6 times in the output
-			out.waitForNElements(6, 120000);
-			
-			List<Integer> result = out.getElements();
-			assertEquals(6, result.size());
-			
-			Collections.sort(result);
-			assertEquals(Arrays.asList(1, 1, 1, 2, 2, 2), result);
-			
-			op.close();
-			op.dispose();
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		} finally {
-			timerService.shutdown();
-		}
-	}
-	
-	@Test
-	public void testEmitTrailingDataOnClose() {
-		try {
-			final CollectingOutput<Integer> out = new CollectingOutput<>();
-
-			final StreamingRuntimeContext mockContext = mock(StreamingRuntimeContext.class);
-			when(mockContext.getTaskName()).thenReturn("Test task name");
-			
-			// the operator has a window time that is so long that it will not fire in this test
-			final long oneYear = 365L * 24 * 60 * 60 * 1000;
-			AggregatingProcessingTimeWindowOperator<Integer, Integer> op = 
-					new AggregatingProcessingTimeWindowOperator<>(sumFunction, identitySelector, oneYear, oneYear);
-			
-			op.setup(out, mockContext);
-			op.open(new Configuration());
-			
-			List<Integer> data = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
-			for (Integer i : data) {
-				op.processElement(new StreamRecord<Integer>(i));
-			}
-			
-			op.close();
-			op.dispose();
-			
-			// get and verify the result
-			List<Integer> result = out.getElements();
-			Collections.sort(result);
-			assertEquals(data, result);
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	@Test
-	public void testPropagateExceptionsFromProcessElement() {
-		try {
-			final CollectingOutput<Integer> out = new CollectingOutput<>();
-
-			final StreamingRuntimeContext mockContext = mock(StreamingRuntimeContext.class);
-			when(mockContext.getTaskName()).thenReturn("Test task name");
-
-			ReduceFunction<Integer> failingFunction = new FailingFunction(100);
-
-			// the operator has a window time that is so long that it will not fire in this test
-			final long hundredYears = 100L * 365 * 24 * 60 * 60 * 1000;
-			AggregatingProcessingTimeWindowOperator<Integer, Integer> op =
-					new AggregatingProcessingTimeWindowOperator<>(
-							failingFunction, identitySelector, hundredYears, hundredYears);
-
-			op.setup(out, mockContext);
-			op.open(new Configuration());
-
-			for (int i = 0; i < 100; i++) {
-				op.processElement(new StreamRecord<Integer>(1));
-			}
-			
-			try {
-				op.processElement(new StreamRecord<Integer>(1));
-				fail("This fail with an exception");
-			}
-			catch (Exception e) {
-				assertTrue(e.getMessage().contains("Artificial Test Exception"));
-			}
-
-			op.dispose();
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	// ------------------------------------------------------------------------
-	
-	private void assertInvalidParameter(long windowSize, long windowSlide) {
-		try {
-			new AggregatingProcessingTimeWindowOperator<String, String>(
-					mockFunction, mockKeySelector, windowSize, windowSlide);
-			fail("This should fail with an IllegalArgumentException");
-		}
-		catch (IllegalArgumentException e) {
-			// expected
-		}
-		catch (Exception e) {
-			fail("Wrong exception. Expected IllegalArgumentException but found " + e.getClass().getSimpleName());
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	
-	private static class FailingFunction implements ReduceFunction<Integer> {
-
-		private final int failAfterElements;
-		
-		private int numElements;
-
-		FailingFunction(int failAfterElements) {
-			this.failAfterElements = failAfterElements;
-		}
-
-		@Override
-		public Integer reduce(Integer value1, Integer value2) throws Exception {
-			numElements++;
-
-			if (numElements >= failAfterElements) {
-				throw new Exception("Artificial Test Exception");
-			}
-			
-			return value1 + value2;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/05d2138f/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windows/CollectingOutput.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windows/CollectingOutput.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windows/CollectingOutput.java
deleted file mode 100644
index 9f6858d..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windows/CollectingOutput.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.operators.windows;
-
-import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-import java.util.ArrayList;
-import java.util.List;
-
-public class CollectingOutput<T> implements Output<StreamRecord<T>> {
-	
-	private final List<T> elements = new ArrayList<>();
-
-	private final int timeStampModulus;
-
-
-	public CollectingOutput() {
-		this.timeStampModulus = 0;
-	}
-	
-	public CollectingOutput(int timeStampModulus) {
-		this.timeStampModulus = timeStampModulus;
-	}
-
-	// ------------------------------------------------------------------------
-	
-	public List<T> getElements() {
-		return elements;
-	}
-	
-	public void waitForNElements(int n, long timeout) throws InterruptedException {
-		long deadline = System.currentTimeMillis() + timeout;
-		synchronized (elements) {
-			long now;
-			while (elements.size() < n && (now = System.currentTimeMillis()) < deadline) {
-				elements.wait(deadline - now);
-			}
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	
-	@Override
-	public void emitWatermark(Watermark mark) {
-		throw new UnsupportedOperationException("the output should not emit watermarks");
-	}
-
-	@Override
-	public void collect(StreamRecord<T> record) {
-		elements.add(record.getValue());
-		
-		if (timeStampModulus != 0 && record.getTimestamp() % timeStampModulus != 0) {
-			throw new IllegalArgumentException("Invalid timestamp");
-		}
-		synchronized (elements) {
-			elements.notifyAll();
-		}
-	}
-
-	@Override
-	public void close() {}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/05d2138f/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windows/KeyMapPutIfAbsentTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windows/KeyMapPutIfAbsentTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windows/KeyMapPutIfAbsentTest.java
deleted file mode 100644
index 2a9e203..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windows/KeyMapPutIfAbsentTest.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.operators.windows;
-
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-public class KeyMapPutIfAbsentTest {
-	
-	@Test
-	public void testPutIfAbsentUniqueKeysAndGrowth() {
-		try {
-			KeyMap<Integer, Integer> map = new KeyMap<>();
-			IntegerFactory factory = new IntegerFactory();
-			
-			final int numElements = 1000000;
-			
-			for (int i = 0; i < numElements; i++) {
-				factory.set(2 * i + 1);
-				map.putIfAbsent(i, factory);
-
-				assertEquals(i+1, map.size());
-				assertTrue(map.getCurrentTableCapacity() > map.size());
-				assertTrue(map.getCurrentTableCapacity() > map.getRehashThreshold());
-				assertTrue(map.size() <= map.getRehashThreshold());
-			}
-			
-			assertEquals(numElements, map.size());
-			assertEquals(numElements, map.traverseAndCountElements());
-			assertEquals(1 << 21, map.getCurrentTableCapacity());
-
-			for (int i = 0; i < numElements; i++) {
-				assertEquals(2 * i + 1, map.get(i).intValue());
-			}
-			
-			for (int i = numElements - 1; i >= 0; i--) {
-				assertEquals(2 * i + 1, map.get(i).intValue());
-			}
-
-			assertEquals(numElements, map.size());
-			assertEquals(numElements, map.traverseAndCountElements());
-			assertEquals(1 << 21, map.getCurrentTableCapacity());
-			assertTrue(map.getLongestChainLength() <= 7);
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	@Test
-	public void testPutIfAbsentDuplicateKeysAndGrowth() {
-		try {
-			KeyMap<Integer, Integer> map = new KeyMap<>();
-			IntegerFactory factory = new IntegerFactory();
-			
-			final int numElements = 1000000;
-
-			for (int i = 0; i < numElements; i++) {
-				int val = 2 * i + 1;
-				factory.set(val);
-				Integer put = map.putIfAbsent(i, factory);
-				assertEquals(val, put.intValue());
-			}
-
-			for (int i = 0; i < numElements; i += 3) {
-				factory.set(2 * i);
-				Integer put = map.putIfAbsent(i, factory);
-				assertEquals(2 * i + 1, put.intValue());
-			}
-
-			for (int i = 0; i < numElements; i++) {
-				assertEquals(2 * i + 1, map.get(i).intValue());
-			}
-
-			assertEquals(numElements, map.size());
-			assertEquals(numElements, map.traverseAndCountElements());
-			assertEquals(1 << 21, map.getCurrentTableCapacity());
-			assertTrue(map.getLongestChainLength() <= 7);
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	// ------------------------------------------------------------------------
-	
-	private static class IntegerFactory implements KeyMap.LazyFactory<Integer> {
-		
-		private Integer toCreate;
-		
-		public void set(Integer toCreate) {
-			this.toCreate = toCreate;
-		}
-
-		@Override
-		public Integer create() {
-			return toCreate;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/05d2138f/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windows/KeyMapPutTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windows/KeyMapPutTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windows/KeyMapPutTest.java
deleted file mode 100644
index 7335976..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windows/KeyMapPutTest.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.operators.windows;
-
-import org.junit.Test;
-
-import java.util.BitSet;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-public class KeyMapPutTest {
-
-	@Test
-	public void testPutUniqueKeysAndGrowth() {
-		try {
-			KeyMap<Integer, Integer> map = new KeyMap<>();
-
-			final int numElements = 1000000;
-
-			for (int i = 0; i < numElements; i++) {
-				map.put(i, 2 * i + 1);
-
-				assertEquals(i+1, map.size());
-				assertTrue(map.getCurrentTableCapacity() > map.size());
-				assertTrue(map.getCurrentTableCapacity() > map.getRehashThreshold());
-				assertTrue(map.size() <= map.getRehashThreshold());
-			}
-
-			assertEquals(numElements, map.size());
-			assertEquals(numElements, map.traverseAndCountElements());
-			assertEquals(1 << 21, map.getCurrentTableCapacity());
-
-			for (int i = 0; i < numElements; i++) {
-				assertEquals(2 * i + 1, map.get(i).intValue());
-			}
-
-			for (int i = numElements - 1; i >= 0; i--) {
-				assertEquals(2 * i + 1, map.get(i).intValue());
-			}
-
-			BitSet bitset = new BitSet();
-			int numContained = 0;
-			for (KeyMap.Entry<Integer, Integer> entry : map) {
-				numContained++;
-				
-				assertEquals(entry.getKey() * 2 + 1, entry.getValue().intValue());
-				assertFalse(bitset.get(entry.getKey()));
-				bitset.set(entry.getKey());
-			}
-
-			assertEquals(numElements, numContained);
-			assertEquals(numElements, bitset.cardinality());
-			
-			
-			assertEquals(numElements, map.size());
-			assertEquals(numElements, map.traverseAndCountElements());
-			assertEquals(1 << 21, map.getCurrentTableCapacity());
-			assertTrue(map.getLongestChainLength() <= 7);
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	@Test
-	public void testPutDuplicateKeysAndGrowth() {
-		try {
-			final KeyMap<Integer, Integer> map = new KeyMap<>();
-			final int numElements = 1000000;
-
-			for (int i = 0; i < numElements; i++) {
-				Integer put = map.put(i, 2*i+1);
-				assertNull(put);
-			}
-
-			for (int i = 0; i < numElements; i += 3) {
-				Integer put = map.put(i, 2*i);
-				assertNotNull(put);
-				assertEquals(2*i+1, put.intValue());
-			}
-
-			for (int i = 0; i < numElements; i++) {
-				int expected = (i % 3 == 0) ? (2*i) : (2*i+1);
-				assertEquals(expected, map.get(i).intValue());
-			}
-			
-			assertEquals(numElements, map.size());
-			assertEquals(numElements, map.traverseAndCountElements());
-			assertEquals(1 << 21, map.getCurrentTableCapacity());
-			assertTrue(map.getLongestChainLength() <= 7);
-
-			
-			BitSet bitset = new BitSet();
-			int numContained = 0;
-			for (KeyMap.Entry<Integer, Integer> entry : map) {
-				numContained++;
-
-				int key = entry.getKey();
-				int expected = key % 3 == 0 ? (2*key) : (2*key+1);
-
-				assertEquals(expected, entry.getValue().intValue());
-				assertFalse(bitset.get(key));
-				bitset.set(key);
-			}
-
-			assertEquals(numElements, numContained);
-			assertEquals(numElements, bitset.cardinality());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/05d2138f/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windows/KeyMapTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windows/KeyMapTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windows/KeyMapTest.java
deleted file mode 100644
index be71af2..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windows/KeyMapTest.java
+++ /dev/null
@@ -1,344 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.operators.windows;
-
-import org.junit.Test;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Random;
-
-import static org.junit.Assert.*;
-
-public class KeyMapTest {
-	
-	@Test
-	public void testInitialSizeComputation() {
-		try {
-			KeyMap<String, String> map;
-
-			map = new KeyMap<>();
-			assertEquals(64, map.getCurrentTableCapacity());
-			assertEquals(6, map.getLog2TableCapacity());
-			assertEquals(24, map.getShift());
-			assertEquals(48, map.getRehashThreshold());
-			
-			map = new KeyMap<>(0);
-			assertEquals(64, map.getCurrentTableCapacity());
-			assertEquals(6, map.getLog2TableCapacity());
-			assertEquals(24, map.getShift());
-			assertEquals(48, map.getRehashThreshold());
-
-			map = new KeyMap<>(1);
-			assertEquals(64, map.getCurrentTableCapacity());
-			assertEquals(6, map.getLog2TableCapacity());
-			assertEquals(24, map.getShift());
-			assertEquals(48, map.getRehashThreshold());
-
-			map = new KeyMap<>(9);
-			assertEquals(64, map.getCurrentTableCapacity());
-			assertEquals(6, map.getLog2TableCapacity());
-			assertEquals(24, map.getShift());
-			assertEquals(48, map.getRehashThreshold());
-
-			map = new KeyMap<>(63);
-			assertEquals(64, map.getCurrentTableCapacity());
-			assertEquals(6, map.getLog2TableCapacity());
-			assertEquals(24, map.getShift());
-			assertEquals(48, map.getRehashThreshold());
-
-			map = new KeyMap<>(64);
-			assertEquals(128, map.getCurrentTableCapacity());
-			assertEquals(7, map.getLog2TableCapacity());
-			assertEquals(23, map.getShift());
-			assertEquals(96, map.getRehashThreshold());
-
-			map = new KeyMap<>(500);
-			assertEquals(512, map.getCurrentTableCapacity());
-			assertEquals(9, map.getLog2TableCapacity());
-			assertEquals(21, map.getShift());
-			assertEquals(384, map.getRehashThreshold());
-
-			map = new KeyMap<>(127);
-			assertEquals(128, map.getCurrentTableCapacity());
-			assertEquals(7, map.getLog2TableCapacity());
-			assertEquals(23, map.getShift());
-			assertEquals(96, map.getRehashThreshold());
-			
-			// no negative number of elements
-			try {
-				new KeyMap<>(-1);
-				fail("should fail with an exception");
-			}
-			catch (IllegalArgumentException e) {
-				// expected
-			}
-			
-			// check integer overflow
-			try {
-				map = new KeyMap<>(0x65715522);
-
-				final int maxCap = Integer.highestOneBit(Integer.MAX_VALUE);
-				assertEquals(Integer.highestOneBit(Integer.MAX_VALUE), map.getCurrentTableCapacity());
-				assertEquals(30, map.getLog2TableCapacity());
-				assertEquals(0, map.getShift());
-				assertEquals(maxCap / 4 * 3, map.getRehashThreshold());
-			}
-			catch (OutOfMemoryError e) {
-				// this may indeed happen in small test setups. we tolerate this in this test
-			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	@Test
-	public void testPutAndGetRandom() {
-		try {
-			final KeyMap<Integer, Integer> map = new KeyMap<>();
-			final Random rnd = new Random();
-			
-			final long seed = rnd.nextLong();
-			final int numElements = 10000;
-			
-			final HashMap<Integer, Integer> groundTruth = new HashMap<>();
-			
-			rnd.setSeed(seed);
-			for (int i = 0; i < numElements; i++) {
-				Integer key = rnd.nextInt();
-				Integer value = rnd.nextInt();
-				
-				if (rnd.nextBoolean()) {
-					groundTruth.put(key, value);
-					map.put(key, value);
-				}
-			}
-
-			rnd.setSeed(seed);
-			for (int i = 0; i < numElements; i++) {
-				Integer key = rnd.nextInt();
-
-				// skip these, evaluating it is tricky due to duplicates
-				rnd.nextInt();
-				rnd.nextBoolean();
-				
-				Integer expected = groundTruth.get(key);
-				if (expected == null) {
-					assertNull(map.get(key));
-				}
-				else {
-					Integer contained = map.get(key);
-					assertNotNull(contained);
-					assertEquals(expected, contained);
-				}
-			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	@Test
-	public void testConjunctTraversal() {
-		try {
-			final Random rootRnd = new Random(654685486325439L);
-			
-			final int numMaps = 7;
-			final int numKeys = 1000000;
-
-			// ------ create a set of maps ------
-			@SuppressWarnings("unchecked")
-			final KeyMap<Integer, Integer>[] maps = (KeyMap<Integer, Integer>[]) new KeyMap<?, ?>[numMaps];
-			for (int i = 0; i < numMaps; i++) {
-				maps[i] = new KeyMap<>();
-			}
-			
-			// ------ prepare probabilities for maps ------
-			final double[] probabilities = new double[numMaps];
-			final double[] probabilitiesTemp = new double[numMaps];
-			{
-				probabilities[0] = 0.5;
-				double remainingProb = 1.0 - probabilities[0];
-				for (int i = 1; i < numMaps - 1; i++) {
-					remainingProb /= 2;
-					probabilities[i] = remainingProb;
-				}
-
-				// compensate for rounding errors
-				probabilities[numMaps - 1] = remainingProb;
-			}
-			
-			// ------ generate random elements ------
-			final long probSeed = rootRnd.nextLong();
-			final long keySeed = rootRnd.nextLong();
-			
-			final Random probRnd = new Random(probSeed);
-			final Random keyRnd = new Random(keySeed);
-			
-			final int maxStride = Integer.MAX_VALUE / numKeys;
-			
-			int totalNumElements = 0;
-			int nextKeyValue = 1;
-			
-			for (int i = 0; i < numKeys; i++) {
-				int numCopies = (nextKeyValue % 3) + 1;
-				System.arraycopy(probabilities, 0, probabilitiesTemp, 0, numMaps);
-				
-				double totalProb = 1.0;
-				for (int copy = 0; copy < numCopies; copy++) {
-					int pos = drawPosProportionally(probabilitiesTemp, totalProb, probRnd);
-					totalProb -= probabilitiesTemp[pos];
-					probabilitiesTemp[pos] = 0.0;
-					
-					Integer boxed = nextKeyValue;
-					Integer previous = maps[pos].put(boxed, boxed);
-					assertNull("Test problem - test does not assign unique maps", previous);
-				}
-				
-				totalNumElements += numCopies;
-				nextKeyValue += keyRnd.nextInt(maxStride) + 1;
-			}
-			
-			
-			// check that all maps contain the total number of elements
-			int numContained = 0;
-			for (KeyMap<?, ?> map : maps) {
-				numContained += map.size();
-			}
-			assertEquals(totalNumElements, numContained);
-
-			// ------ check that all elements can be found in the maps ------
-			keyRnd.setSeed(keySeed);
-			
-			numContained = 0;
-			nextKeyValue = 1;
-			for (int i = 0; i < numKeys; i++) {
-				int numCopiesExpected = (nextKeyValue % 3) + 1;
-				int numCopiesContained = 0;
-				
-				for (KeyMap<Integer, Integer> map : maps) {
-					Integer val = map.get(nextKeyValue);
-					if (val != null) {
-						assertEquals(nextKeyValue, val.intValue());
-						numCopiesContained++;
-					}
-				}
-				
-				assertEquals(numCopiesExpected, numCopiesContained);
-				numContained += numCopiesContained;
-				
-				nextKeyValue += keyRnd.nextInt(maxStride) + 1;
-			}
-			assertEquals(totalNumElements, numContained);
-
-			// ------ make a traversal over all keys and validate the keys in the traversal ------
-			final int[] keysStartedAndFinished = { 0, 0 };
-			KeyMap.TraversalEvaluator<Integer, Integer> traversal = new KeyMap.TraversalEvaluator<Integer, Integer>() {
-
-				private int key;
-				private int valueCount;
-				
-				@Override
-				public void startNewKey(Integer key) {
-					this.key = key;
-					this.valueCount = 0;
-					
-					keysStartedAndFinished[0]++;
-				}
-
-				@Override
-				public void nextValue(Integer value) {
-					assertEquals(this.key, value.intValue());
-					this.valueCount++;
-				}
-
-				@Override
-				public void keyDone() {
-					int expected = (key % 3) + 1;
-					if (expected != valueCount) {
-						fail("Wrong count for key " + key + " ; expected=" + expected + " , count=" + valueCount);
-					}
-					
-					keysStartedAndFinished[1]++;
-				}
-			};
-			
-			KeyMap.traverseMaps(shuffleArray(maps, rootRnd), traversal, 17);
-			
-			assertEquals(numKeys, keysStartedAndFinished[0]);
-			assertEquals(numKeys, keysStartedAndFinished[1]);
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	@Test
-	public void testSizeComparator() {
-		try {
-			KeyMap<String, String> map1 = new KeyMap<>(5);
-			KeyMap<String, String> map2 = new KeyMap<>(80);
-			
-			assertTrue(map1.getCurrentTableCapacity() < map2.getCurrentTableCapacity());
-			
-			assertTrue(KeyMap.CapacityDescendingComparator.INSTANCE.compare(map1, map1) == 0);
-			assertTrue(KeyMap.CapacityDescendingComparator.INSTANCE.compare(map2, map2) == 0);
-			assertTrue(KeyMap.CapacityDescendingComparator.INSTANCE.compare(map1, map2) > 0);
-			assertTrue(KeyMap.CapacityDescendingComparator.INSTANCE.compare(map2, map1) < 0);
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	// ------------------------------------------------------------------------
-
-	private static int drawPosProportionally(double[] array, double totalProbability, Random rnd) {
-		double val = rnd.nextDouble() * totalProbability;
-		
-		double accum = 0;
-		for (int i = 0; i < array.length; i++) {
-			accum += array[i];
-			if (val <= accum && array[i] > 0.0) {
-				return i;
-			}
-		}
-		
-		// in case of rounding errors
-		return array.length - 1;
-	}
-	
-	private static <E> E[] shuffleArray(E[] array, Random rnd) {
-		E[] target = Arrays.copyOf(array, array.length);
-		
-		for (int i = target.length - 1; i > 0; i--) {
-			int swapPos = rnd.nextInt(i + 1);
-			E temp = target[i];
-			target[i] = target[swapPos];
-			target[swapPos] = temp;
-		}
-		
-		return target;
-	}
-}