You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/21 11:03:39 UTC

[23/51] [partial] flink git commit: [FLINK-2877] Move Streaming API out of Staging package

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
deleted file mode 100644
index 0ddf272..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.operators;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
-import org.apache.flink.streaming.api.graph.StreamConfig;
-import org.apache.flink.streaming.api.operators.StreamMap;
-import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
-import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
-import org.apache.flink.streaming.runtime.tasks.StreamTask;
-
-import org.junit.Test;
-import org.junit.runner.RunWith;
-
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-import java.util.concurrent.atomic.AtomicReference;
-
-import static org.junit.Assert.*;
-
-/**
- * Tests for the timer service of {@link org.apache.flink.streaming.runtime.tasks.StreamTask}.
- */
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(ResultPartitionWriter.class)
-@SuppressWarnings("serial")
-public class StreamTaskTimerTest {
-
-	@Test
-	public void testOpenCloseAndTimestamps() throws Exception {
-		final OneInputStreamTask<String, String> mapTask = new OneInputStreamTask<>();
-		
-		final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<>(mapTask, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
-
-		StreamConfig streamConfig = testHarness.getStreamConfig();
-		
-		StreamMap<String, String> mapOperator = new StreamMap<>(new DummyMapFunction<String>());
-		streamConfig.setStreamOperator(mapOperator);
-
-		testHarness.invoke();
-
-		// first one spawns thread
-		mapTask.registerTimer(System.currentTimeMillis(), new Triggerable() {
-			@Override
-			public void trigger(long timestamp) {}
-		});
-
-		assertEquals(1, StreamTask.TRIGGER_THREAD_GROUP.activeCount());
-
-
-		testHarness.endInput();
-		testHarness.waitForTaskCompletion();
-
-		// thread needs to die in time
-		long deadline = System.currentTimeMillis() + 4000;
-		while (StreamTask.TRIGGER_THREAD_GROUP.activeCount() > 0 && System.currentTimeMillis() < deadline) {
-			Thread.sleep(10);
-		}
-
-		assertEquals("Trigger timer thread did not properly shut down",
-				0, StreamTask.TRIGGER_THREAD_GROUP.activeCount());
-	}
-	
-	@Test
-	public void checkScheduledTimestampe() {
-		try {
-			final OneInputStreamTask<String, String> mapTask = new OneInputStreamTask<>();
-			final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<>(mapTask, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
-
-			StreamConfig streamConfig = testHarness.getStreamConfig();
-			StreamMap<String, String> mapOperator = new StreamMap<>(new DummyMapFunction<String>());
-			streamConfig.setStreamOperator(mapOperator);
-
-			testHarness.invoke();
-
-			final AtomicReference<Throwable> errorRef = new AtomicReference<>();
-
-			final long t1 = System.currentTimeMillis();
-			final long t2 = System.currentTimeMillis() - 200;
-			final long t3 = System.currentTimeMillis() + 100;
-			final long t4 = System.currentTimeMillis() + 200;
-
-			mapTask.registerTimer(t1, new ValidatingTriggerable(errorRef, t1, 0));
-			mapTask.registerTimer(t2, new ValidatingTriggerable(errorRef, t2, 1));
-			mapTask.registerTimer(t3, new ValidatingTriggerable(errorRef, t3, 2));
-			mapTask.registerTimer(t4, new ValidatingTriggerable(errorRef, t4, 3));
-
-			long deadline = System.currentTimeMillis() + 20000;
-			while (errorRef.get() == null &&
-					ValidatingTriggerable.numInSequence < 4 &&
-					System.currentTimeMillis() < deadline)
-			{
-				Thread.sleep(100);
-			}
-
-			// handle errors
-			if (errorRef.get() != null) {
-				errorRef.get().printStackTrace();
-				fail(errorRef.get().getMessage());
-			}
-
-			assertEquals(4, ValidatingTriggerable.numInSequence);
-
-			testHarness.endInput();
-			testHarness.waitForTaskCompletion();
-
-			// wait until the trigger thread is shut down. otherwise, the other tests may become unstable
-			deadline = System.currentTimeMillis() + 4000;
-			while (StreamTask.TRIGGER_THREAD_GROUP.activeCount() > 0 && System.currentTimeMillis() < deadline) {
-				Thread.sleep(10);
-			}
-
-			assertEquals("Trigger timer thread did not properly shut down",
-					0, StreamTask.TRIGGER_THREAD_GROUP.activeCount());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	private static class ValidatingTriggerable implements Triggerable {
-		
-		static int numInSequence;
-		
-		private final AtomicReference<Throwable> errorRef;
-		
-		private final long expectedTimestamp;
-		private final int expectedInSequence;
-
-		private ValidatingTriggerable(AtomicReference<Throwable> errorRef, long expectedTimestamp, int expectedInSequence) {
-			this.errorRef = errorRef;
-			this.expectedTimestamp = expectedTimestamp;
-			this.expectedInSequence = expectedInSequence;
-		}
-
-		@Override
-		public void trigger(long timestamp) {
-			try {
-				assertEquals(expectedTimestamp, timestamp);
-				assertEquals(expectedInSequence, numInSequence);
-				numInSequence++;
-			}
-			catch (Throwable t) {
-				errorRef.compareAndSet(null, t);
-			}
-		}
-	}
-	
-	// ------------------------------------------------------------------------
-	
-	public static class DummyMapFunction<T> implements MapFunction<T, T> {
-		@Override
-		public T map(T value) { return value; }
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
deleted file mode 100644
index ad3c838..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
+++ /dev/null
@@ -1,824 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.operators.windowing;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.accumulators.Accumulator;
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.execution.Environment;
-import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
-import org.apache.flink.streaming.api.graph.StreamConfig;
-import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.runtime.state.StateBackend;
-import org.apache.flink.runtime.state.memory.MemoryStateBackend;
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-import org.apache.flink.streaming.runtime.operators.Triggerable;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.StreamTask;
-import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
-import org.apache.flink.util.Collector;
-
-import org.junit.After;
-import org.junit.Test;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-import org.mockito.stubbing.OngoingStubbing;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import static org.mockito.Mockito.*;
-import static org.junit.Assert.*;
-
-@SuppressWarnings({"serial", "SynchronizationOnLocalVariableOrMethodParameter"})
-public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
-
-	@SuppressWarnings("unchecked")
-	private final WindowFunction<String, String, String, TimeWindow> mockFunction = mock(WindowFunction.class);
-
-	@SuppressWarnings("unchecked")
-	private final KeySelector<String, String> mockKeySelector = mock(KeySelector.class);
-	
-	private final KeySelector<Integer, Integer> identitySelector = new KeySelector<Integer, Integer>() {
-		@Override
-		public Integer getKey(Integer value) {
-			return value;
-		}
-	};
-	
-	private final WindowFunction<Integer, Integer, Integer, TimeWindow> validatingIdentityFunction =
-			new WindowFunction<Integer, Integer, Integer, TimeWindow>()
-	{
-		@Override
-		public void apply(Integer key,
-				TimeWindow window,
-				Iterable<Integer> values,
-				Collector<Integer> out) {
-			for (Integer val : values) {
-				assertEquals(key, val);
-				out.collect(val);
-			}
-		}
-	};
-
-	// ------------------------------------------------------------------------
-
-	@After
-	public void checkNoTriggerThreadsRunning() {
-		// make sure that all the threads we trigger are shut down
-		long deadline = System.currentTimeMillis() + 5000;
-		while (StreamTask.TRIGGER_THREAD_GROUP.activeCount() > 0 && System.currentTimeMillis() < deadline) {
-			try {
-				Thread.sleep(10);
-			}
-			catch (InterruptedException ignored) {}
-		}
-
-		assertTrue("Not all trigger threads where properly shut down",
-				StreamTask.TRIGGER_THREAD_GROUP.activeCount() == 0);
-	}
-	
-	// ------------------------------------------------------------------------
-	
-	@Test
-	public void testInvalidParameters() {
-		try {
-			assertInvalidParameter(-1L, -1L);
-			assertInvalidParameter(10000L, -1L);
-			assertInvalidParameter(-1L, 1000L);
-			assertInvalidParameter(1000L, 2000L);
-			
-			// actual internal slide is too low here:
-			assertInvalidParameter(1000L, 999L);
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	@Test
-	public void testWindowSizeAndSlide() {
-		try {
-			AccumulatingProcessingTimeWindowOperator<String, String, String> op;
-			
-			op = new AccumulatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector,
-					StringSerializer.INSTANCE, StringSerializer.INSTANCE, 5000, 1000);
-			assertEquals(5000, op.getWindowSize());
-			assertEquals(1000, op.getWindowSlide());
-			assertEquals(1000, op.getPaneSize());
-			assertEquals(5, op.getNumPanesPerWindow());
-
-			op = new AccumulatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector,
-					StringSerializer.INSTANCE, StringSerializer.INSTANCE, 1000, 1000);
-			assertEquals(1000, op.getWindowSize());
-			assertEquals(1000, op.getWindowSlide());
-			assertEquals(1000, op.getPaneSize());
-			assertEquals(1, op.getNumPanesPerWindow());
-
-			op = new AccumulatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector,
-					StringSerializer.INSTANCE, StringSerializer.INSTANCE, 1500, 1000);
-			assertEquals(1500, op.getWindowSize());
-			assertEquals(1000, op.getWindowSlide());
-			assertEquals(500, op.getPaneSize());
-			assertEquals(3, op.getNumPanesPerWindow());
-
-			op = new AccumulatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector,
-					StringSerializer.INSTANCE, StringSerializer.INSTANCE, 1200, 1100);
-			assertEquals(1200, op.getWindowSize());
-			assertEquals(1100, op.getWindowSlide());
-			assertEquals(100, op.getPaneSize());
-			assertEquals(12, op.getNumPanesPerWindow());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	@Test
-	public void testWindowTriggerTimeAlignment() {
-		try {
-			@SuppressWarnings("unchecked")
-			final Output<StreamRecord<String>> mockOut = mock(Output.class);
-			final StreamTask<?, ?> mockTask = createMockTask();
-
-			AccumulatingProcessingTimeWindowOperator<String, String, String> op;
-
-			op = new AccumulatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector,
-					StringSerializer.INSTANCE, StringSerializer.INSTANCE, 5000, 1000);
-			op.setup(mockTask, new StreamConfig(new Configuration()), mockOut);
-			op.open();
-			assertTrue(op.getNextSlideTime() % 1000 == 0);
-			assertTrue(op.getNextEvaluationTime() % 1000 == 0);
-			op.dispose();
-
-			op = new AccumulatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector,
-					StringSerializer.INSTANCE, StringSerializer.INSTANCE, 1000, 1000);
-			op.setup(mockTask, new StreamConfig(new Configuration()), mockOut);
-			op.open();
-			assertTrue(op.getNextSlideTime() % 1000 == 0);
-			assertTrue(op.getNextEvaluationTime() % 1000 == 0);
-			op.dispose();
-
-			op = new AccumulatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector,
-					StringSerializer.INSTANCE, StringSerializer.INSTANCE, 1500, 1000);
-			op.setup(mockTask, new StreamConfig(new Configuration()), mockOut);
-			op.open();
-			assertTrue(op.getNextSlideTime() % 500 == 0);
-			assertTrue(op.getNextEvaluationTime() % 1000 == 0);
-			op.dispose();
-
-			op = new AccumulatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector,
-					StringSerializer.INSTANCE, StringSerializer.INSTANCE, 1200, 1100);
-			op.setup(mockTask, new StreamConfig(new Configuration()), mockOut);
-			op.open();
-			assertTrue(op.getNextSlideTime() % 100 == 0);
-			assertTrue(op.getNextEvaluationTime() % 1100 == 0);
-			op.dispose();
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	@Test
-	public void testTumblingWindow() {
-		final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
-		try {
-			final int windowSize = 50;
-			final CollectingOutput<Integer> out = new CollectingOutput<>(windowSize);
-			final Object lock = new Object();
-			final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
-
-			// tumbling window that triggers every 20 milliseconds
-			AccumulatingProcessingTimeWindowOperator<Integer, Integer, Integer> op =
-					new AccumulatingProcessingTimeWindowOperator<>(
-							validatingIdentityFunction, identitySelector,
-							IntSerializer.INSTANCE, IntSerializer.INSTANCE,
-							windowSize, windowSize);
-
-			op.setup(mockTask, new StreamConfig(new Configuration()), out);
-			op.open();
-
-			final int numElements = 1000;
-
-			for (int i = 0; i < numElements; i++) {
-				synchronized (lock) {
-					op.processElement(new StreamRecord<Integer>(i));
-				}
-				Thread.sleep(1);
-			}
-
-			synchronized (lock) {
-				op.close();
-			}
-			op.dispose();
-
-			// get and verify the result
-			List<Integer> result = out.getElements();
-			assertEquals(numElements, result.size());
-
-			Collections.sort(result);
-			for (int i = 0; i < numElements; i++) {
-				assertEquals(i, result.get(i).intValue());
-			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-		finally {
-			timerService.shutdown();
-		}
-	}
-
-	@Test
-	public void testSlidingWindow() {
-		final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
-		try {
-			final CollectingOutput<Integer> out = new CollectingOutput<>(50);
-			final Object lock = new Object();
-			final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
-			
-			// tumbling window that triggers every 20 milliseconds
-			AccumulatingProcessingTimeWindowOperator<Integer, Integer, Integer> op =
-					new AccumulatingProcessingTimeWindowOperator<>(
-							validatingIdentityFunction, identitySelector,
-							IntSerializer.INSTANCE, IntSerializer.INSTANCE, 150, 50);
-
-			op.setup(mockTask, new StreamConfig(new Configuration()), out);
-			op.open();
-
-			final int numElements = 1000;
-
-			for (int i = 0; i < numElements; i++) {
-				synchronized (lock) {
-					op.processElement(new StreamRecord<Integer>(i));
-				}
-				Thread.sleep(1);
-			}
-
-			synchronized (lock) {
-				op.close();
-			}
-			op.dispose();
-
-			// get and verify the result
-			List<Integer> result = out.getElements();
-
-			// if we kept this running, each element would be in the result three times (for each slide).
-			// we are closing the window before the final panes are through three times, so we may have less
-			// elements.
-			if (result.size() < numElements || result.size() > 3 * numElements) {
-				fail("Wrong number of results: " + result.size());
-			}
-
-			Collections.sort(result);
-			int lastNum = -1;
-			int lastCount = -1;
-			
-			for (int num : result) {
-				if (num == lastNum) {
-					lastCount++;
-					assertTrue(lastCount <= 3);
-				}
-				else {
-					lastNum = num;
-					lastCount = 1;
-				}
-			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-		finally {
-			timerService.shutdown();
-		}
-	}
-
-	@Test
-	public void testTumblingWindowSingleElements() {
-		final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
-
-		try {
-			final CollectingOutput<Integer> out = new CollectingOutput<>(50);
-			final Object lock = new Object();
-			final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
-
-			// tumbling window that triggers every 20 milliseconds
-			AccumulatingProcessingTimeWindowOperator<Integer, Integer, Integer> op =
-					new AccumulatingProcessingTimeWindowOperator<>(
-							validatingIdentityFunction, identitySelector,
-							IntSerializer.INSTANCE, IntSerializer.INSTANCE, 50, 50);
-
-			op.setup(mockTask, new StreamConfig(new Configuration()), out);
-			op.open();
-
-			synchronized (lock) {
-				op.processElement(new StreamRecord<Integer>(1));
-				op.processElement(new StreamRecord<Integer>(2));
-			}
-			out.waitForNElements(2, 60000);
-
-			synchronized (lock) {
-				op.processElement(new StreamRecord<Integer>(3));
-				op.processElement(new StreamRecord<Integer>(4));
-				op.processElement(new StreamRecord<Integer>(5));
-			}
-			out.waitForNElements(5, 60000);
-
-			synchronized (lock) {
-				op.processElement(new StreamRecord<Integer>(6));
-			}
-			out.waitForNElements(6, 60000);
-			
-			List<Integer> result = out.getElements();
-			assertEquals(6, result.size());
-
-			Collections.sort(result);
-			assertEquals(Arrays.asList(1, 2, 3, 4, 5, 6), result);
-
-			synchronized (lock) {
-				op.close();
-			}
-			op.dispose();
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-		finally {
-			timerService.shutdown();
-		}
-	}
-	
-	@Test
-	public void testSlidingWindowSingleElements() {
-		final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
-
-		try {
-			final CollectingOutput<Integer> out = new CollectingOutput<>(50);
-			final Object lock = new Object();
-			final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
-
-			// tumbling window that triggers every 20 milliseconds
-			AccumulatingProcessingTimeWindowOperator<Integer, Integer, Integer> op =
-					new AccumulatingProcessingTimeWindowOperator<>(
-							validatingIdentityFunction, identitySelector,
-							IntSerializer.INSTANCE, IntSerializer.INSTANCE, 150, 50);
-
-			op.setup(mockTask, new StreamConfig(new Configuration()), out);
-			op.open();
-
-			synchronized (lock) {
-				op.processElement(new StreamRecord<Integer>(1));
-				op.processElement(new StreamRecord<Integer>(2));
-			}
-
-			// each element should end up in the output three times
-			// wait until the elements have arrived 6 times in the output
-			out.waitForNElements(6, 120000);
-			
-			List<Integer> result = out.getElements();
-			assertEquals(6, result.size());
-			
-			Collections.sort(result);
-			assertEquals(Arrays.asList(1, 1, 1, 2, 2, 2), result);
-
-			synchronized (lock) {
-				op.close();
-			}
-			op.dispose();
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-		finally {
-			timerService.shutdown();
-		}
-	}
-	
-	@Test
-	public void testEmitTrailingDataOnClose() {
-		final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
-		try {
-			final CollectingOutput<Integer> out = new CollectingOutput<>();
-			final Object lock = new Object();
-			final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
-			
-			// the operator has a window time that is so long that it will not fire in this test
-			final long oneYear = 365L * 24 * 60 * 60 * 1000;
-			AccumulatingProcessingTimeWindowOperator<Integer, Integer, Integer> op =
-					new AccumulatingProcessingTimeWindowOperator<>(
-							validatingIdentityFunction, identitySelector,
-							IntSerializer.INSTANCE, IntSerializer.INSTANCE,
-							oneYear, oneYear);
-
-			op.setup(mockTask, new StreamConfig(new Configuration()), out);
-			op.open();
-			
-			List<Integer> data = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
-			for (Integer i : data) {
-				synchronized (lock) {
-					op.processElement(new StreamRecord<Integer>(i));
-				}
-			}
-
-			synchronized (lock) {
-				op.close();
-			}
-			op.dispose();
-			
-			// get and verify the result
-			List<Integer> result = out.getElements();
-			Collections.sort(result);
-			assertEquals(data, result);
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-		finally {
-			timerService.shutdown();
-		}
-	}
-
-	@Test
-	public void testPropagateExceptionsFromClose() {
-		final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
-		try {
-			final CollectingOutput<Integer> out = new CollectingOutput<>();
-			final Object lock = new Object();
-			final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
-
-			WindowFunction<Integer, Integer, Integer, TimeWindow> failingFunction = new FailingFunction(100);
-
-			// the operator has a window time that is so long that it will not fire in this test
-			final long hundredYears = 100L * 365 * 24 * 60 * 60 * 1000;
-			AccumulatingProcessingTimeWindowOperator<Integer, Integer, Integer> op =
-					new AccumulatingProcessingTimeWindowOperator<>(
-							failingFunction, identitySelector,
-							IntSerializer.INSTANCE, IntSerializer.INSTANCE,
-							hundredYears, hundredYears);
-
-			op.setup(mockTask, new StreamConfig(new Configuration()), out);
-			op.open();
-
-			for (int i = 0; i < 150; i++) {
-				synchronized (lock) {
-					op.processElement(new StreamRecord<Integer>(i));
-				}
-			}
-			
-			try {
-				synchronized (lock) {
-					op.close();
-				}
-				fail("This should fail with an exception");
-			}
-			catch (Exception e) {
-				assertTrue(
-						e.getMessage().contains("Artificial Test Exception") ||
-						(e.getCause() != null && e.getCause().getMessage().contains("Artificial Test Exception")));
-			}
-
-			op.dispose();
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-		finally {
-			timerService.shutdown();
-		}
-	}
-	
-	@Test
-	public void checkpointRestoreWithPendingWindowTumbling() {
-		final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
-		try {
-			final int windowSize = 200;
-			final CollectingOutput<Integer> out = new CollectingOutput<>(windowSize);
-			final Object lock = new Object();
-			final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
-
-			// tumbling window that triggers every 50 milliseconds
-			AccumulatingProcessingTimeWindowOperator<Integer, Integer, Integer> op =
-					new AccumulatingProcessingTimeWindowOperator<>(
-							validatingIdentityFunction, identitySelector,
-							IntSerializer.INSTANCE, IntSerializer.INSTANCE,
-							windowSize, windowSize);
-
-			op.setup(mockTask, new StreamConfig(new Configuration()), out);
-			op.open();
-
-			// inject some elements
-			final int numElementsFirst = 700;
-			for (int i = 0; i < numElementsFirst; i++) {
-				synchronized (lock) {
-					op.processElement(new StreamRecord<Integer>(i));
-				}
-				Thread.sleep(1);
-			}
-
-			// draw a snapshot and dispose the window
-			StreamTaskState state;
-			List<Integer> resultAtSnapshot;
-			synchronized (lock) {
-				int beforeSnapShot = out.getElements().size(); 
-				state = op.snapshotOperatorState(1L, System.currentTimeMillis());
-				resultAtSnapshot = new ArrayList<>(out.getElements());
-				int afterSnapShot = out.getElements().size();
-				assertEquals("operator performed computation during snapshot", beforeSnapShot, afterSnapShot);
-			}
-
-			// inject some random elements, which should not show up in the state
-			for (int i = 0; i < 300; i++) {
-				synchronized (lock) {
-					op.processElement(new StreamRecord<Integer>(i + numElementsFirst));
-				}
-				Thread.sleep(1);
-			}
-			
-			op.dispose();
-			
-			// re-create the operator and restore the state
-			final CollectingOutput<Integer> out2 = new CollectingOutput<>(windowSize);
-			op = new AccumulatingProcessingTimeWindowOperator<>(
-							validatingIdentityFunction, identitySelector,
-							IntSerializer.INSTANCE, IntSerializer.INSTANCE,
-							windowSize, windowSize);
-
-			op.setup(mockTask, new StreamConfig(new Configuration()), out2);
-			op.restoreState(state);
-			op.open();
-
-			// inject some more elements
-			final int numElements = 1000;
-			for (int i = numElementsFirst; i < numElements; i++) {
-				synchronized (lock) {
-					op.processElement(new StreamRecord<Integer>(i));
-				}
-				Thread.sleep(1);
-			}
-
-			synchronized (lock) {
-				op.close();
-			}
-			op.dispose();
-
-			// get and verify the result
-			List<Integer> finalResult = new ArrayList<>(resultAtSnapshot);
-			finalResult.addAll(out2.getElements());
-			assertEquals(numElements, finalResult.size());
-
-			Collections.sort(finalResult);
-			for (int i = 0; i < numElements; i++) {
-				assertEquals(i, finalResult.get(i).intValue());
-			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-		finally {
-			timerService.shutdown();
-		}
-	}
-
-	@Test
-	public void checkpointRestoreWithPendingWindowSliding() {
-		final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
-		try {
-			final int factor = 4;
-			final int windowSlide = 50;
-			final int windowSize = factor * windowSlide;
-			
-			final CollectingOutput<Integer> out = new CollectingOutput<>(windowSlide);
-			final Object lock = new Object();
-			final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
-
-			// sliding window (200 msecs) every 50 msecs
-			AccumulatingProcessingTimeWindowOperator<Integer, Integer, Integer> op =
-					new AccumulatingProcessingTimeWindowOperator<>(
-							validatingIdentityFunction, identitySelector,
-							IntSerializer.INSTANCE, IntSerializer.INSTANCE,
-							windowSize, windowSlide);
-
-			op.setup(mockTask, new StreamConfig(new Configuration()), out);
-			op.open();
-
-			// inject some elements
-			final int numElements = 1000;
-			final int numElementsFirst = 700;
-			
-			for (int i = 0; i < numElementsFirst; i++) {
-				synchronized (lock) {
-					op.processElement(new StreamRecord<Integer>(i));
-				}
-				Thread.sleep(1);
-			}
-
-			// draw a snapshot
-			StreamTaskState state;
-			List<Integer> resultAtSnapshot;
-			synchronized (lock) {
-				int beforeSnapShot = out.getElements().size();
-				state = op.snapshotOperatorState(1L, System.currentTimeMillis());
-				resultAtSnapshot = new ArrayList<>(out.getElements());
-				int afterSnapShot = out.getElements().size();
-				assertEquals("operator performed computation during snapshot", beforeSnapShot, afterSnapShot);
-			}
-			
-			assertTrue(resultAtSnapshot.size() <= factor * numElementsFirst);
-
-			// inject the remaining elements - these should not influence the snapshot
-			for (int i = numElementsFirst; i < numElements; i++) {
-				synchronized (lock) {
-					op.processElement(new StreamRecord<Integer>(i));
-				}
-				Thread.sleep(1);
-			}
-			
-			op.dispose();
-			
-			// re-create the operator and restore the state
-			final CollectingOutput<Integer> out2 = new CollectingOutput<>(windowSlide);
-			op = new AccumulatingProcessingTimeWindowOperator<>(
-					validatingIdentityFunction, identitySelector,
-					IntSerializer.INSTANCE, IntSerializer.INSTANCE,
-					windowSize, windowSlide);
-
-			op.setup(mockTask, new StreamConfig(new Configuration()), out2);
-			op.restoreState(state);
-			op.open();
-			
-
-			// inject again the remaining elements
-			for (int i = numElementsFirst; i < numElements; i++) {
-				synchronized (lock) {
-					op.processElement(new StreamRecord<Integer>(i));
-				}
-				Thread.sleep(1);
-			}
-
-			// for a deterministic result, we need to wait until all pending triggers
-			// have fired and emitted their results
-			long deadline = System.currentTimeMillis() + 120000;
-			do {
-				Thread.sleep(20);
-			}
-			while (resultAtSnapshot.size() + out2.getElements().size() < factor * numElements
-					&& System.currentTimeMillis() < deadline);
-
-			synchronized (lock) {
-				op.close();
-			}
-			op.dispose();
-
-			// get and verify the result
-			List<Integer> finalResult = new ArrayList<>(resultAtSnapshot);
-			finalResult.addAll(out2.getElements());
-			assertEquals(factor * numElements, finalResult.size());
-
-			Collections.sort(finalResult);
-			for (int i = 0; i < factor * numElements; i++) {
-				assertEquals(i / factor, finalResult.get(i).intValue());
-			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-		finally {
-			timerService.shutdown();
-		}
-	}
-	
-	// ------------------------------------------------------------------------
-	
-	private void assertInvalidParameter(long windowSize, long windowSlide) {
-		try {
-			new AccumulatingProcessingTimeWindowOperator<String, String, String>(
-					mockFunction, mockKeySelector, 
-					StringSerializer.INSTANCE, StringSerializer.INSTANCE,
-					windowSize, windowSlide);
-			fail("This should fail with an IllegalArgumentException");
-		}
-		catch (IllegalArgumentException e) {
-			// expected
-		}
-		catch (Exception e) {
-			fail("Wrong exception. Expected IllegalArgumentException but found " + e.getClass().getSimpleName());
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	
-	private static class FailingFunction implements WindowFunction<Integer, Integer, Integer, TimeWindow> {
-
-		private final int failAfterElements;
-		
-		private int numElements;
-
-		FailingFunction(int failAfterElements) {
-			this.failAfterElements = failAfterElements;
-		}
-
-		@Override
-		public void apply(Integer integer,
-				TimeWindow window,
-				Iterable<Integer> values,
-				Collector<Integer> out) throws Exception {
-			for (Integer i : values) {
-				out.collect(i);
-				numElements++;
-				
-				if (numElements >= failAfterElements) {
-					throw new Exception("Artificial Test Exception");
-				}
-			}
-		}
-	}
-
-	private static StreamTask<?, ?> createMockTask() {
-		StreamTask<?, ?> task = mock(StreamTask.class);
-		when(task.getAccumulatorMap()).thenReturn(new HashMap<String, Accumulator<?, ?>>());
-		when(task.getName()).thenReturn("Test task name");
-		when(task.getExecutionConfig()).thenReturn(new ExecutionConfig());
-
-		Environment env = mock(Environment.class);
-		when(env.getIndexInSubtaskGroup()).thenReturn(0);
-		when(env.getNumberOfSubtasks()).thenReturn(1);
-		when(env.getUserClassLoader()).thenReturn(AggregatingAlignedProcessingTimeWindowOperatorTest.class.getClassLoader());
-
-		when(task.getEnvironment()).thenReturn(env);
-
-		// ugly java generic hacks to get the state backend into the mock
-		@SuppressWarnings("unchecked")
-		OngoingStubbing<StateBackend<?>> stubbing =
-				(OngoingStubbing<StateBackend<?>>) (OngoingStubbing<?>) when(task.getStateBackend());
-		stubbing.thenReturn(MemoryStateBackend.defaultInstance());
-		
-		return task;
-	}
-
-	private static StreamTask<?, ?> createMockTaskWithTimer(
-			final ScheduledExecutorService timerService, final Object lock)
-	{
-		StreamTask<?, ?> mockTask = createMockTask();
-
-		doAnswer(new Answer<Void>() {
-			@Override
-			public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
-				final Long timestamp = (Long) invocationOnMock.getArguments()[0];
-				final Triggerable target = (Triggerable) invocationOnMock.getArguments()[1];
-				timerService.schedule(
-						new Callable<Object>() {
-							@Override
-							public Object call() throws Exception {
-								synchronized (lock) {
-									target.trigger(timestamp);
-								}
-								return null;
-							}
-						},
-						timestamp - System.currentTimeMillis(),
-						TimeUnit.MILLISECONDS);
-				return null;
-			}
-		}).when(mockTask).registerTimer(anyLong(), any(Triggerable.class));
-
-		return mockTask;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
deleted file mode 100644
index 4bd260f..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
+++ /dev/null
@@ -1,823 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.operators.windowing;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.accumulators.Accumulator;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.execution.Environment;
-import org.apache.flink.streaming.api.graph.StreamConfig;
-import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.runtime.state.StateBackend;
-import org.apache.flink.runtime.state.memory.MemoryStateBackend;
-import org.apache.flink.streaming.runtime.operators.Triggerable;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.StreamTask;
-
-import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
-import org.junit.After;
-import org.junit.Test;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-import org.mockito.stubbing.OngoingStubbing;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyLong;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-@SuppressWarnings({"serial", "SynchronizationOnLocalVariableOrMethodParameter"})
-public class AggregatingAlignedProcessingTimeWindowOperatorTest {
-
-	@SuppressWarnings("unchecked")
-	private final ReduceFunction<String> mockFunction = mock(ReduceFunction.class);
-
-	@SuppressWarnings("unchecked")
-	private final KeySelector<String, String> mockKeySelector = mock(KeySelector.class);
-	
-	private final KeySelector<Integer, Integer> identitySelector = new KeySelector<Integer, Integer>() {
-		@Override
-		public Integer getKey(Integer value) {
-			return value;
-		}
-	};
-	
-	private final ReduceFunction<Integer> sumFunction = new ReduceFunction<Integer>() {
-		@Override
-		public Integer reduce(Integer value1, Integer value2) {
-			return value1 + value2;
-		}
-	};
-
-	// ------------------------------------------------------------------------
-
-	@After
-	public void checkNoTriggerThreadsRunning() {
-		// make sure that all the threads we trigger are shut down
-		long deadline = System.currentTimeMillis() + 5000;
-		while (StreamTask.TRIGGER_THREAD_GROUP.activeCount() > 0 && System.currentTimeMillis() < deadline) {
-			try {
-				Thread.sleep(10);
-			}
-			catch (InterruptedException ignored) {}
-		}
-
-		assertTrue("Not all trigger threads where properly shut down",
-				StreamTask.TRIGGER_THREAD_GROUP.activeCount() == 0);
-	}
-	
-	// ------------------------------------------------------------------------
-	
-	@Test
-	public void testInvalidParameters() {
-		try {
-			assertInvalidParameter(-1L, -1L);
-			assertInvalidParameter(10000L, -1L);
-			assertInvalidParameter(-1L, 1000L);
-			assertInvalidParameter(1000L, 2000L);
-			
-			// actual internal slide is too low here:
-			assertInvalidParameter(1000L, 999L);
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	@Test
-	public void testWindowSizeAndSlide() {
-		try {
-			AggregatingProcessingTimeWindowOperator<String, String> op;
-			
-			op = new AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector,
-					StringSerializer.INSTANCE, StringSerializer.INSTANCE, 5000, 1000);
-			assertEquals(5000, op.getWindowSize());
-			assertEquals(1000, op.getWindowSlide());
-			assertEquals(1000, op.getPaneSize());
-			assertEquals(5, op.getNumPanesPerWindow());
-
-			op = new AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector,
-					StringSerializer.INSTANCE, StringSerializer.INSTANCE, 1000, 1000);
-			assertEquals(1000, op.getWindowSize());
-			assertEquals(1000, op.getWindowSlide());
-			assertEquals(1000, op.getPaneSize());
-			assertEquals(1, op.getNumPanesPerWindow());
-
-			op = new AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector,
-					StringSerializer.INSTANCE, StringSerializer.INSTANCE, 1500, 1000);
-			assertEquals(1500, op.getWindowSize());
-			assertEquals(1000, op.getWindowSlide());
-			assertEquals(500, op.getPaneSize());
-			assertEquals(3, op.getNumPanesPerWindow());
-
-			op = new AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector,
-					StringSerializer.INSTANCE, StringSerializer.INSTANCE, 1200, 1100);
-			assertEquals(1200, op.getWindowSize());
-			assertEquals(1100, op.getWindowSlide());
-			assertEquals(100, op.getPaneSize());
-			assertEquals(12, op.getNumPanesPerWindow());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	@Test
-	public void testWindowTriggerTimeAlignment() {
-		try {
-			@SuppressWarnings("unchecked")
-			final Output<StreamRecord<String>> mockOut = mock(Output.class);
-			final StreamTask<?, ?> mockTask = createMockTask();
-			
-			AggregatingProcessingTimeWindowOperator<String, String> op;
-
-			op = new AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector,
-					StringSerializer.INSTANCE, StringSerializer.INSTANCE, 5000, 1000);
-			op.setup(mockTask, new StreamConfig(new Configuration()), mockOut);
-			op.open();
-			assertTrue(op.getNextSlideTime() % 1000 == 0);
-			assertTrue(op.getNextEvaluationTime() % 1000 == 0);
-			op.dispose();
-
-			op = new AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector,
-					StringSerializer.INSTANCE, StringSerializer.INSTANCE, 1000, 1000);
-			op.setup(mockTask, new StreamConfig(new Configuration()), mockOut);
-			op.open();
-			assertTrue(op.getNextSlideTime() % 1000 == 0);
-			assertTrue(op.getNextEvaluationTime() % 1000 == 0);
-			op.dispose();
-
-			op = new AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector,
-					StringSerializer.INSTANCE, StringSerializer.INSTANCE, 1500, 1000);
-			op.setup(mockTask, new StreamConfig(new Configuration()), mockOut);
-			op.open();
-			assertTrue(op.getNextSlideTime() % 500 == 0);
-			assertTrue(op.getNextEvaluationTime() % 1000 == 0);
-			op.dispose();
-
-			op = new AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector,
-					StringSerializer.INSTANCE, StringSerializer.INSTANCE, 1200, 1100);
-			op.setup(mockTask, new StreamConfig(new Configuration()), mockOut);
-			op.open();
-			assertTrue(op.getNextSlideTime() % 100 == 0);
-			assertTrue(op.getNextEvaluationTime() % 1100 == 0);
-			op.dispose();
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	@Test
-	public void testTumblingWindowUniqueElements() {
-		final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
-		try {
-			final int windowSize = 50;
-			final CollectingOutput<Integer> out = new CollectingOutput<>(windowSize);
-			
-			AggregatingProcessingTimeWindowOperator<Integer, Integer> op =
-					new AggregatingProcessingTimeWindowOperator<>(
-							sumFunction, identitySelector,
-							IntSerializer.INSTANCE, IntSerializer.INSTANCE,
-							windowSize, windowSize);
-			
-			final Object lock = new Object();
-			final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
-			
-			op.setup(mockTask, new StreamConfig(new Configuration()), out);
-			op.open();
-
-			final int numElements = 1000;
-
-			for (int i = 0; i < numElements; i++) {
-				synchronized (lock) {
-					op.processElement(new StreamRecord<Integer>(i));
-				}
-				Thread.sleep(1);
-			}
-
-			synchronized (lock) {
-				op.close();
-			}
-			op.dispose();
-
-			// get and verify the result
-			List<Integer> result = out.getElements();
-			assertEquals(numElements, result.size());
-
-			Collections.sort(result);
-			for (int i = 0; i < numElements; i++) {
-				assertEquals(i, result.get(i).intValue());
-			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-		finally {
-			timerService.shutdownNow();
-		}
-	}
-
-	@Test
-	public void  testTumblingWindowDuplicateElements() {
-		final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
-
-		try {
-			final int windowSize = 50;
-			final CollectingOutput<Integer> out = new CollectingOutput<>(windowSize);
-
-			final Object lock = new Object();
-			final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
-			
-			AggregatingProcessingTimeWindowOperator<Integer, Integer> op =
-					new AggregatingProcessingTimeWindowOperator<>(
-							sumFunction, identitySelector,
-							IntSerializer.INSTANCE, IntSerializer.INSTANCE,
-							windowSize, windowSize);
-			
-			op.setup(mockTask, new StreamConfig(new Configuration()), out);
-			op.open();
-
-			final int numWindows = 10;
-
-			long previousNextTime = 0;
-			int window = 1;
-			
-			while (window <= numWindows) {
-				synchronized (lock) {
-					long nextTime = op.getNextEvaluationTime();
-					int val = ((int) nextTime) ^ ((int) (nextTime >>> 32));
-					
-					op.processElement(new StreamRecord<Integer>(val));
-
-					if (nextTime != previousNextTime) {
-						window++;
-						previousNextTime = nextTime;
-					}
-				}
-				Thread.sleep(1);
-			}
-
-			synchronized (lock) {
-				op.close();
-			}
-			op.dispose();
-			
-			List<Integer> result = out.getElements();
-			
-			// we have ideally one element per window. we may have more, when we emitted a value into the
-			// successive window (corner case), so we can have twice the number of elements, in the worst case.
-			assertTrue(result.size() >= numWindows && result.size() <= 2 * numWindows);
-
-			// deduplicate for more accurate checks
-			HashSet<Integer> set = new HashSet<>(result);
-			assertTrue(set.size() == 10);
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-		finally {
-			timerService.shutdown();
-		}
-	}
-
-	@Test
-	public void testSlidingWindow() {
-		final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
-		try {
-			final CollectingOutput<Integer> out = new CollectingOutput<>(50);
-
-			final Object lock = new Object();
-			final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
-
-			// tumbling window that triggers every 20 milliseconds
-			AggregatingProcessingTimeWindowOperator<Integer, Integer> op =
-					new AggregatingProcessingTimeWindowOperator<>(
-							sumFunction, identitySelector,
-							IntSerializer.INSTANCE, IntSerializer.INSTANCE,
-							150, 50);
-
-			op.setup(mockTask, new StreamConfig(new Configuration()), out);
-			op.open();
-
-			final int numElements = 1000;
-
-			for (int i = 0; i < numElements; i++) {
-				synchronized (lock) {
-					op.processElement(new StreamRecord<Integer>(i));
-				}
-				Thread.sleep(1);
-			}
-
-			synchronized (lock) {
-				op.close();
-			}
-			op.dispose();
-
-			// get and verify the result
-			List<Integer> result = out.getElements();
-			
-			// every element can occur between one and three times
-			if (result.size() < numElements || result.size() > 3 * numElements) {
-				System.out.println(result);
-				fail("Wrong number of results: " + result.size());
-			}
-
-			Collections.sort(result);
-			int lastNum = -1;
-			int lastCount = -1;
-			
-			for (int num : result) {
-				if (num == lastNum) {
-					lastCount++;
-					assertTrue(lastCount <= 3);
-				}
-				else {
-					lastNum = num;
-					lastCount = 1;
-				}
-			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-		finally {
-			timerService.shutdownNow();
-		}
-	}
-
-	@Test
-	public void testSlidingWindowSingleElements() {
-		final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
-
-		try {
-			final CollectingOutput<Integer> out = new CollectingOutput<>(50);
-			final Object lock = new Object();
-			final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
-
-			// tumbling window that triggers every 20 milliseconds
-			AggregatingProcessingTimeWindowOperator<Integer, Integer> op =
-					new AggregatingProcessingTimeWindowOperator<>(
-							sumFunction, identitySelector,
-							IntSerializer.INSTANCE, IntSerializer.INSTANCE, 150, 50);
-
-			op.setup(mockTask, new StreamConfig(new Configuration()), out);
-			op.open();
-
-			synchronized (lock) {
-				op.processElement(new StreamRecord<Integer>(1));
-				op.processElement(new StreamRecord<Integer>(2));
-			}
-
-			// each element should end up in the output three times
-			// wait until the elements have arrived 6 times in the output
-			out.waitForNElements(6, 120000);
-			
-			List<Integer> result = out.getElements();
-			assertEquals(6, result.size());
-			
-			Collections.sort(result);
-			assertEquals(Arrays.asList(1, 1, 1, 2, 2, 2), result);
-
-			synchronized (lock) {
-				op.close();
-			}
-			op.dispose();
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-		finally {
-			timerService.shutdown();
-		}
-	}
-	
-	@Test
-	public void testEmitTrailingDataOnClose() {
-		final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
-		try {
-			final CollectingOutput<Integer> out = new CollectingOutput<>();
-			final Object lock = new Object();
-			final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
-			
-			// the operator has a window time that is so long that it will not fire in this test
-			final long oneYear = 365L * 24 * 60 * 60 * 1000;
-			AggregatingProcessingTimeWindowOperator<Integer, Integer> op = 
-					new AggregatingProcessingTimeWindowOperator<>(sumFunction, identitySelector,
-							IntSerializer.INSTANCE, IntSerializer.INSTANCE, oneYear, oneYear);
-
-			op.setup(mockTask, new StreamConfig(new Configuration()), out);
-			op.open();
-			
-			List<Integer> data = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
-			for (Integer i : data) {
-				synchronized (lock) {
-					op.processElement(new StreamRecord<Integer>(i));
-				}
-			}
-
-			synchronized (lock) {
-				op.close();
-			}
-			op.dispose();
-			
-			// get and verify the result
-			List<Integer> result = out.getElements();
-			Collections.sort(result);
-			assertEquals(data, result);
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-		finally {
-			timerService.shutdown();
-		}
-	}
-
-	@Test
-	public void testPropagateExceptionsFromProcessElement() {
-		final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
-		try {
-			final CollectingOutput<Integer> out = new CollectingOutput<>();
-			final Object lock = new Object();
-			final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
-
-			ReduceFunction<Integer> failingFunction = new FailingFunction(100);
-
-			// the operator has a window time that is so long that it will not fire in this test
-			final long hundredYears = 100L * 365 * 24 * 60 * 60 * 1000;
-			AggregatingProcessingTimeWindowOperator<Integer, Integer> op =
-					new AggregatingProcessingTimeWindowOperator<>(
-							failingFunction, identitySelector,
-							IntSerializer.INSTANCE, IntSerializer.INSTANCE,
-							hundredYears, hundredYears);
-
-			op.setup(mockTask, new StreamConfig(new Configuration()), out);
-			op.open();
-
-			for (int i = 0; i < 100; i++) {
-				synchronized (lock) {
-					op.processElement(new StreamRecord<Integer>(1));
-				}
-			}
-			
-			try {
-				op.processElement(new StreamRecord<Integer>(1));
-				fail("This fail with an exception");
-			}
-			catch (Exception e) {
-				assertTrue(e.getMessage().contains("Artificial Test Exception"));
-			}
-
-			op.dispose();
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-		finally {
-			timerService.shutdown();
-		}
-	}
-
-	@Test
-	public void checkpointRestoreWithPendingWindowTumbling() {
-		final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
-		try {
-			final int windowSize = 200;
-			final CollectingOutput<Integer> out = new CollectingOutput<>(windowSize);
-			final Object lock = new Object();
-			final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
-
-			// tumbling window that triggers every 50 milliseconds
-			AggregatingProcessingTimeWindowOperator<Integer, Integer> op =
-					new AggregatingProcessingTimeWindowOperator<>(
-							sumFunction, identitySelector,
-							IntSerializer.INSTANCE, IntSerializer.INSTANCE,
-							windowSize, windowSize);
-
-			op.setup(mockTask, new StreamConfig(new Configuration()), out);
-			op.open();
-
-			// inject some elements
-			final int numElementsFirst = 700;
-			final int numElements = 1000;
-			
-			for (int i = 0; i < numElementsFirst; i++) {
-				synchronized (lock) {
-					op.processElement(new StreamRecord<Integer>(i));
-				}
-				Thread.sleep(1);
-			}
-
-			// draw a snapshot and dispose the window
-			StreamTaskState state;
-			List<Integer> resultAtSnapshot;
-			synchronized (lock) {
-				int beforeSnapShot = out.getElements().size();
-				state = op.snapshotOperatorState(1L, System.currentTimeMillis());
-				resultAtSnapshot = new ArrayList<>(out.getElements());
-				int afterSnapShot = out.getElements().size();
-				assertEquals("operator performed computation during snapshot", beforeSnapShot, afterSnapShot);
-			}
-			
-			assertTrue(resultAtSnapshot.size() <= numElementsFirst);
-
-			// inject some random elements, which should not show up in the state
-			for (int i = numElementsFirst; i < numElements; i++) {
-				synchronized (lock) {
-					op.processElement(new StreamRecord<Integer>(i));
-				}
-				Thread.sleep(1);
-			}
-
-			op.dispose();
-
-			// re-create the operator and restore the state
-			final CollectingOutput<Integer> out2 = new CollectingOutput<>(windowSize);
-			op = new AggregatingProcessingTimeWindowOperator<>(
-					sumFunction, identitySelector,
-					IntSerializer.INSTANCE, IntSerializer.INSTANCE,
-					windowSize, windowSize);
-
-			op.setup(mockTask, new StreamConfig(new Configuration()), out2);
-			op.restoreState(state);
-			op.open();
-
-			// inject the remaining elements
-			for (int i = numElementsFirst; i < numElements; i++) {
-				synchronized (lock) {
-					op.processElement(new StreamRecord<Integer>(i));
-				}
-				Thread.sleep(1);
-			}
-
-			synchronized (lock) {
-				op.close();
-			}
-			op.dispose();
-
-			// get and verify the result
-			List<Integer> finalResult = new ArrayList<>(resultAtSnapshot);
-			finalResult.addAll(out2.getElements());
-			assertEquals(numElements, finalResult.size());
-
-			Collections.sort(finalResult);
-			for (int i = 0; i < numElements; i++) {
-				assertEquals(i, finalResult.get(i).intValue());
-			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-		finally {
-			timerService.shutdown();
-		}
-	}
-
-	@Test
-	public void checkpointRestoreWithPendingWindowSliding() {
-		final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
-		try {
-			final int factor = 4;
-			final int windowSlide = 50;
-			final int windowSize = factor * windowSlide;
-
-			final CollectingOutput<Integer> out = new CollectingOutput<>(windowSlide);
-			final Object lock = new Object();
-			final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
-
-			// sliding window (200 msecs) every 50 msecs
-			AggregatingProcessingTimeWindowOperator<Integer, Integer> op =
-					new AggregatingProcessingTimeWindowOperator<>(
-							sumFunction, identitySelector,
-							IntSerializer.INSTANCE, IntSerializer.INSTANCE,
-							windowSize, windowSlide);
-
-			op.setup(mockTask, new StreamConfig(new Configuration()), out);
-			op.open();
-
-			// inject some elements
-			final int numElements = 1000;
-			final int numElementsFirst = 700;
-
-			for (int i = 0; i < numElementsFirst; i++) {
-				synchronized (lock) {
-					op.processElement(new StreamRecord<Integer>(i));
-				}
-				Thread.sleep(1);
-			}
-
-			// draw a snapshot
-			StreamTaskState state;
-			List<Integer> resultAtSnapshot;
-			synchronized (lock) {
-				int beforeSnapShot = out.getElements().size();
-				state = op.snapshotOperatorState(1L, System.currentTimeMillis());
-				resultAtSnapshot = new ArrayList<>(out.getElements());
-				int afterSnapShot = out.getElements().size();
-				assertEquals("operator performed computation during snapshot", beforeSnapShot, afterSnapShot);
-			}
-
-			assertTrue(resultAtSnapshot.size() <= factor * numElementsFirst);
-
-			// inject the remaining elements - these should not influence the snapshot
-			for (int i = numElementsFirst; i < numElements; i++) {
-				synchronized (lock) {
-					op.processElement(new StreamRecord<Integer>(i));
-				}
-				Thread.sleep(1);
-			}
-
-			op.dispose();
-
-			// re-create the operator and restore the state
-			final CollectingOutput<Integer> out2 = new CollectingOutput<>(windowSlide);
-			op = new AggregatingProcessingTimeWindowOperator<>(
-					sumFunction, identitySelector,
-					IntSerializer.INSTANCE, IntSerializer.INSTANCE,
-					windowSize, windowSlide);
-
-			op.setup(mockTask, new StreamConfig(new Configuration()), out2);
-			op.restoreState(state);
-			op.open();
-
-
-			// inject again the remaining elements
-			for (int i = numElementsFirst; i < numElements; i++) {
-				synchronized (lock) {
-					op.processElement(new StreamRecord<Integer>(i));
-				}
-				Thread.sleep(1);
-			}
-
-			// for a deterministic result, we need to wait until all pending triggers
-			// have fired and emitted their results
-			long deadline = System.currentTimeMillis() + 120000;
-			do {
-				Thread.sleep(20);
-			}
-			while (resultAtSnapshot.size() + out2.getElements().size() < factor * numElements
-					&& System.currentTimeMillis() < deadline);
-
-			synchronized (lock) {
-				op.close();
-			}
-			op.dispose();
-
-			// get and verify the result
-			List<Integer> finalResult = new ArrayList<>(resultAtSnapshot);
-			finalResult.addAll(out2.getElements());
-			assertEquals(factor * numElements, finalResult.size());
-
-			Collections.sort(finalResult);
-			for (int i = 0; i < factor * numElements; i++) {
-				assertEquals(i / factor, finalResult.get(i).intValue());
-			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-		finally {
-			timerService.shutdown();
-		}
-	}
-	
-	// ------------------------------------------------------------------------
-	
-	private void assertInvalidParameter(long windowSize, long windowSlide) {
-		try {
-			new AggregatingProcessingTimeWindowOperator<String, String>(
-					mockFunction, mockKeySelector,
-					StringSerializer.INSTANCE, StringSerializer.INSTANCE,
-					windowSize, windowSlide);
-			fail("This should fail with an IllegalArgumentException");
-		}
-		catch (IllegalArgumentException e) {
-			// expected
-		}
-		catch (Exception e) {
-			fail("Wrong exception. Expected IllegalArgumentException but found " + e.getClass().getSimpleName());
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	
-	private static class FailingFunction implements ReduceFunction<Integer> {
-
-		private final int failAfterElements;
-		
-		private int numElements;
-
-		FailingFunction(int failAfterElements) {
-			this.failAfterElements = failAfterElements;
-		}
-
-		@Override
-		public Integer reduce(Integer value1, Integer value2) throws Exception {
-			numElements++;
-
-			if (numElements >= failAfterElements) {
-				throw new Exception("Artificial Test Exception");
-			}
-			
-			return value1 + value2;
-		}
-	}
-	
-	private static StreamTask<?, ?> createMockTask() {
-		StreamTask<?, ?> task = mock(StreamTask.class);
-		when(task.getAccumulatorMap()).thenReturn(new HashMap<String, Accumulator<?, ?>>());
-		when(task.getName()).thenReturn("Test task name");
-		when(task.getExecutionConfig()).thenReturn(new ExecutionConfig());
-
-		Environment env = mock(Environment.class);
-		when(env.getIndexInSubtaskGroup()).thenReturn(0);
-		when(env.getNumberOfSubtasks()).thenReturn(1);
-		when(env.getUserClassLoader()).thenReturn(AggregatingAlignedProcessingTimeWindowOperatorTest.class.getClassLoader());
-		
-		when(task.getEnvironment()).thenReturn(env);
-
-		// ugly java generic hacks to get the state backend into the mock
-		@SuppressWarnings("unchecked")
-		OngoingStubbing<StateBackend<?>> stubbing =
-				(OngoingStubbing<StateBackend<?>>) (OngoingStubbing<?>) when(task.getStateBackend());
-		stubbing.thenReturn(MemoryStateBackend.defaultInstance());
-		
-		return task;
-	}
-
-	private static StreamTask<?, ?> createMockTaskWithTimer(
-			final ScheduledExecutorService timerService, final Object lock)
-	{
-		StreamTask<?, ?> mockTask = createMockTask();
-
-		doAnswer(new Answer<Void>() {
-			@Override
-			public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
-				final Long timestamp = (Long) invocationOnMock.getArguments()[0];
-				final Triggerable target = (Triggerable) invocationOnMock.getArguments()[1];
-				timerService.schedule(
-						new Callable<Object>() {
-							@Override
-							public Object call() throws Exception {
-								synchronized (lock) {
-									target.trigger(timestamp);
-								}
-								return null;
-							}
-						},
-						timestamp - System.currentTimeMillis(),
-						TimeUnit.MILLISECONDS);
-				return null;
-			}
-		}).when(mockTask).registerTimer(anyLong(), any(Triggerable.class));
-		
-		return mockTask;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
deleted file mode 100644
index 282c71f..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
+++ /dev/null
@@ -1,215 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.runtime.operators.windowing;
-
-import org.apache.flink.api.common.functions.RichReduceFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.transformations.OneInputTransformation;
-import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows;
-import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows;
-import org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
-import org.apache.flink.streaming.api.windowing.evictors.TimeEvictor;
-import org.apache.flink.streaming.api.windowing.time.Time;
-import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
-import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-import org.apache.flink.streaming.runtime.operators.windowing.buffers.HeapWindowBuffer;
-import org.apache.flink.streaming.runtime.operators.windowing.buffers.PreAggregatingHeapWindowBuffer;
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
-import org.apache.flink.util.Collector;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.concurrent.TimeUnit;
-
-/**
- * These tests verify that the api calls on
- * {@link org.apache.flink.streaming.api.datastream.AllWindowedStream} instantiate
- * the correct window operator.
- */
-public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase {
-
-	/**
-	 * These tests ensure that the correct trigger is set when using event-time windows.
-	 */
-	@Test
-	@SuppressWarnings("rawtypes")
-	public void testEventTime() throws Exception {
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
-		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
-
-		DummyReducer reducer = new DummyReducer();
-
-		DataStream<Tuple2<String, Integer>> window1 = source
-				.windowAll(SlidingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
-				.reduce(reducer);
-
-		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
-		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator();
-		Assert.assertTrue(operator1 instanceof NonKeyedWindowOperator);
-		NonKeyedWindowOperator winOperator1 = (NonKeyedWindowOperator) operator1;
-		Assert.assertFalse(winOperator1.isSetProcessingTime());
-		Assert.assertTrue(winOperator1.getTrigger() instanceof EventTimeTrigger);
-		Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingTimeWindows);
-		Assert.assertTrue(winOperator1.getWindowBufferFactory() instanceof PreAggregatingHeapWindowBuffer.Factory);
-
-		DataStream<Tuple2<String, Integer>> window2 = source
-				.windowAll(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
-				.apply(new AllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() {
-					private static final long serialVersionUID = 1L;
-
-					@Override
-					public void apply(
-							TimeWindow window,
-							Iterable<Tuple2<String, Integer>> values,
-							Collector<Tuple2<String, Integer>> out) throws Exception {
-
-					}
-				});
-
-		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform2 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window2.getTransformation();
-		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator2 = transform2.getOperator();
-		Assert.assertTrue(operator2 instanceof NonKeyedWindowOperator);
-		NonKeyedWindowOperator winOperator2 = (NonKeyedWindowOperator) operator2;
-		Assert.assertFalse(winOperator2.isSetProcessingTime());
-		Assert.assertTrue(winOperator2.getTrigger() instanceof EventTimeTrigger);
-		Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingTimeWindows);
-		Assert.assertTrue(winOperator2.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory);
-	}
-
-	@Test
-	@SuppressWarnings("rawtypes")
-	public void testNonEvicting() throws Exception {
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
-
-		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
-
-		DummyReducer reducer = new DummyReducer();
-
-		DataStream<Tuple2<String, Integer>> window1 = source
-				.windowAll(SlidingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
-				.trigger(CountTrigger.of(100))
-				.reduce(reducer);
-
-		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
-		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator();
-		Assert.assertTrue(operator1 instanceof NonKeyedWindowOperator);
-		NonKeyedWindowOperator winOperator1 = (NonKeyedWindowOperator) operator1;
-		Assert.assertTrue(winOperator1.isSetProcessingTime());
-		Assert.assertTrue(winOperator1.getTrigger() instanceof CountTrigger);
-		Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingTimeWindows);
-		Assert.assertTrue(winOperator1.getWindowBufferFactory() instanceof PreAggregatingHeapWindowBuffer.Factory);
-
-		DataStream<Tuple2<String, Integer>> window2 = source
-				.windowAll(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
-				.trigger(CountTrigger.of(100))
-				.apply(new AllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() {
-					private static final long serialVersionUID = 1L;
-
-					@Override
-					public void apply(
-							TimeWindow window,
-							Iterable<Tuple2<String, Integer>> values,
-							Collector<Tuple2<String, Integer>> out) throws Exception {
-
-					}
-				});
-
-		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform2 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window2.getTransformation();
-		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator2 = transform2.getOperator();
-		Assert.assertTrue(operator2 instanceof NonKeyedWindowOperator);
-		NonKeyedWindowOperator winOperator2 = (NonKeyedWindowOperator) operator2;
-		Assert.assertTrue(winOperator1.isSetProcessingTime());
-		Assert.assertTrue(winOperator2.getTrigger() instanceof CountTrigger);
-		Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingTimeWindows);
-		Assert.assertTrue(winOperator2.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory);
-	}
-
-	@Test
-	@SuppressWarnings("rawtypes")
-	public void testEvicting() throws Exception {
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
-
-		DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
-
-		DummyReducer reducer = new DummyReducer();
-
-		DataStream<Tuple2<String, Integer>> window1 = source
-				.windowAll(SlidingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
-				.evictor(CountEvictor.of(100))
-				.reduce(reducer);
-
-		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
-		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator();
-		Assert.assertTrue(operator1 instanceof EvictingNonKeyedWindowOperator);
-		EvictingNonKeyedWindowOperator winOperator1 = (EvictingNonKeyedWindowOperator) operator1;
-		Assert.assertFalse(winOperator1.isSetProcessingTime());
-		Assert.assertTrue(winOperator1.getTrigger() instanceof EventTimeTrigger);
-		Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingTimeWindows);
-		Assert.assertTrue(winOperator1.getEvictor() instanceof CountEvictor);
-		Assert.assertTrue(winOperator1.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory);
-
-		DataStream<Tuple2<String, Integer>> window2 = source
-				.windowAll(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
-				.trigger(CountTrigger.of(100))
-				.evictor(TimeEvictor.of(Time.of(100, TimeUnit.MILLISECONDS)))
-				.apply(new AllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() {
-					private static final long serialVersionUID = 1L;
-
-					@Override
-					public void apply(
-							TimeWindow window,
-							Iterable<Tuple2<String, Integer>> values,
-							Collector<Tuple2<String, Integer>> out) throws Exception {
-
-					}
-				});
-
-		OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform2 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window2.getTransformation();
-		OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator2 = transform2.getOperator();
-		Assert.assertTrue(operator2 instanceof EvictingNonKeyedWindowOperator);
-		EvictingNonKeyedWindowOperator winOperator2 = (EvictingNonKeyedWindowOperator) operator2;
-		Assert.assertFalse(winOperator2.isSetProcessingTime());
-		Assert.assertTrue(winOperator2.getTrigger() instanceof CountTrigger);
-		Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingTimeWindows);
-		Assert.assertTrue(winOperator2.getEvictor() instanceof TimeEvictor);
-		Assert.assertTrue(winOperator2.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory);
-	}
-
-	// ------------------------------------------------------------------------
-	//  UDFs
-	// ------------------------------------------------------------------------
-
-	public static class DummyReducer extends RichReduceFunction<Tuple2<String, Integer>> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
-			return value1;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CoGroupJoinITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CoGroupJoinITCase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CoGroupJoinITCase.java
deleted file mode 100644
index cfae026..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CoGroupJoinITCase.java
+++ /dev/null
@@ -1,373 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements.  See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License.  You may obtain a copy of the License at
-*
-*    http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.flink.streaming.runtime.operators.windowing;
-
-import com.google.common.collect.Lists;
-import org.apache.flink.api.common.functions.CoGroupFunction;
-import org.apache.flink.api.common.functions.JoinFunction;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.TimestampExtractor;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows;
-import org.apache.flink.streaming.api.windowing.time.Time;
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
-import org.apache.flink.util.Collector;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-public class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase {
-
-	private static List<String> testResults;
-
-	@Test
-	public void testCoGroup() throws Exception {
-
-		testResults = Lists.newArrayList();
-
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-		env.setParallelism(1);
-
-		DataStream<Tuple2<String, Integer>> source1 = env.addSource(new SourceFunction<Tuple2<String, Integer>>() {
-			private static final long serialVersionUID = 1L;
-
-			@Override
-			public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {
-				ctx.collect(Tuple2.of("a", 0));
-				ctx.collect(Tuple2.of("a", 1));
-				ctx.collect(Tuple2.of("a", 2));
-
-				ctx.collect(Tuple2.of("b", 3));
-				ctx.collect(Tuple2.of("b", 4));
-				ctx.collect(Tuple2.of("b", 5));
-
-				ctx.collect(Tuple2.of("a", 6));
-				ctx.collect(Tuple2.of("a", 7));
-				ctx.collect(Tuple2.of("a", 8));
-			}
-
-			@Override
-			public void cancel() {
-			}
-		}).assignTimestamps(new Tuple2TimestampExtractor());
-
-		DataStream<Tuple2<String, Integer>> source2 = env.addSource(new SourceFunction<Tuple2<String, Integer>>() {
-			private static final long serialVersionUID = 1L;
-
-			@Override
-			public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {
-				ctx.collect(Tuple2.of("a", 0));
-				ctx.collect(Tuple2.of("a", 1));
-
-				ctx.collect(Tuple2.of("b", 3));
-
-				ctx.collect(Tuple2.of("c", 6));
-				ctx.collect(Tuple2.of("c", 7));
-				ctx.collect(Tuple2.of("c", 8));
-			}
-
-			@Override
-			public void cancel() {
-			}
-		}).assignTimestamps(new Tuple2TimestampExtractor());
-
-
-		source1.coGroup(source2)
-				.where(new Tuple2KeyExtractor())
-				.equalTo(new Tuple2KeyExtractor())
-				.window(TumblingTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
-				.apply(new CoGroupFunction<Tuple2<String,Integer>, Tuple2<String,Integer>, String>() {
-					@Override
-					public void coGroup(Iterable<Tuple2<String, Integer>> first,
-							Iterable<Tuple2<String, Integer>> second,
-							Collector<String> out) throws Exception {
-						StringBuilder result = new StringBuilder();
-						result.append("F:");
-						for (Tuple2<String, Integer> t: first) {
-							result.append(t.toString());
-						}
-						result.append(" S:");
-						for (Tuple2<String, Integer> t: second) {
-							result.append(t.toString());
-						}
-						out.collect(result.toString());
-					}
-				})
-				.addSink(new SinkFunction<String>() {
-					@Override
-					public void invoke(String value) throws Exception {
-						testResults.add(value);
-					}
-				});
-
-		env.execute("CoGroup Test");
-
-		List<String> expectedResult = Lists.newArrayList(
-				"F:(a,0)(a,1)(a,2) S:(a,0)(a,1)",
-				"F:(b,3)(b,4)(b,5) S:(b,3)",
-				"F:(a,6)(a,7)(a,8) S:",
-				"F: S:(c,6)(c,7)(c,8)");
-
-		Collections.sort(expectedResult);
-		Collections.sort(testResults);
-
-		Assert.assertEquals(expectedResult, testResults);
-	}
-
-	@Test
-	public void testJoin() throws Exception {
-
-		testResults = Lists.newArrayList();
-
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-		env.setParallelism(1);
-
-		DataStream<Tuple3<String, String, Integer>> source1 = env.addSource(new SourceFunction<Tuple3<String, String, Integer>>() {
-			private static final long serialVersionUID = 1L;
-
-			@Override
-			public void run(SourceContext<Tuple3<String, String, Integer>> ctx) throws Exception {
-				ctx.collect(Tuple3.of("a", "x", 0));
-				ctx.collect(Tuple3.of("a", "y", 1));
-				ctx.collect(Tuple3.of("a", "z", 2));
-
-				ctx.collect(Tuple3.of("b", "u", 3));
-				ctx.collect(Tuple3.of("b", "w", 5));
-
-				ctx.collect(Tuple3.of("a", "i", 6));
-				ctx.collect(Tuple3.of("a", "j", 7));
-				ctx.collect(Tuple3.of("a", "k", 8));
-			}
-
-			@Override
-			public void cancel() {
-			}
-		}).assignTimestamps(new Tuple3TimestampExtractor());
-
-		DataStream<Tuple3<String, String, Integer>> source2 = env.addSource(new SourceFunction<Tuple3<String, String, Integer>>() {
-			private static final long serialVersionUID = 1L;
-
-			@Override
-			public void run(SourceContext<Tuple3<String, String, Integer>> ctx) throws Exception {
-				ctx.collect(Tuple3.of("a", "u", 0));
-				ctx.collect(Tuple3.of("a", "w", 1));
-
-				ctx.collect(Tuple3.of("b", "i", 3));
-				ctx.collect(Tuple3.of("b", "k", 5));
-
-				ctx.collect(Tuple3.of("a", "x", 6));
-				ctx.collect(Tuple3.of("a", "z", 8));
-			}
-
-			@Override
-			public void cancel() {
-			}
-		}).assignTimestamps(new Tuple3TimestampExtractor());
-
-
-		source1.join(source2)
-				.where(new Tuple3KeyExtractor())
-				.equalTo(new Tuple3KeyExtractor())
-				.window(TumblingTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
-				.apply(new JoinFunction<Tuple3<String, String, Integer>, Tuple3<String, String, Integer>, String>() {
-					@Override
-					public String join(Tuple3<String, String, Integer> first, Tuple3<String, String, Integer> second) throws Exception {
-						return first + ":" + second;
-					}
-				})
-				.addSink(new SinkFunction<String>() {
-					@Override
-					public void invoke(String value) throws Exception {
-						testResults.add(value);
-					}
-				});
-
-		env.execute("Join Test");
-
-		List<String> expectedResult = Lists.newArrayList(
-				"(a,x,0):(a,u,0)",
-				"(a,x,0):(a,w,1)",
-				"(a,y,1):(a,u,0)",
-				"(a,y,1):(a,w,1)",
-				"(a,z,2):(a,u,0)",
-				"(a,z,2):(a,w,1)",
-				"(b,u,3):(b,i,3)",
-				"(b,u,3):(b,k,5)",
-				"(b,w,5):(b,i,3)",
-				"(b,w,5):(b,k,5)",
-				"(a,i,6):(a,x,6)",
-				"(a,i,6):(a,z,8)",
-				"(a,j,7):(a,x,6)",
-				"(a,j,7):(a,z,8)",
-				"(a,k,8):(a,x,6)",
-				"(a,k,8):(a,z,8)");
-
-		Collections.sort(expectedResult);
-		Collections.sort(testResults);
-
-		Assert.assertEquals(expectedResult, testResults);
-	}
-
-	@Test
-	public void testSelfJoin() throws Exception {
-
-		testResults = Lists.newArrayList();
-
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-		env.setParallelism(1);
-
-		DataStream<Tuple3<String, String, Integer>> source1 = env.addSource(new SourceFunction<Tuple3<String, String, Integer>>() {
-			private static final long serialVersionUID = 1L;
-
-			@Override
-			public void run(SourceContext<Tuple3<String, String, Integer>> ctx) throws Exception {
-				ctx.collect(Tuple3.of("a", "x", 0));
-				ctx.collect(Tuple3.of("a", "y", 1));
-				ctx.collect(Tuple3.of("a", "z", 2));
-
-				ctx.collect(Tuple3.of("b", "u", 3));
-				ctx.collect(Tuple3.of("b", "w", 5));
-
-				ctx.collect(Tuple3.of("a", "i", 6));
-				ctx.collect(Tuple3.of("a", "j", 7));
-				ctx.collect(Tuple3.of("a", "k", 8));
-			}
-
-			@Override
-			public void cancel() {
-			}
-		}).assignTimestamps(new Tuple3TimestampExtractor());
-
-		source1.join(source1)
-				.where(new Tuple3KeyExtractor())
-				.equalTo(new Tuple3KeyExtractor())
-				.window(TumblingTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
-				.apply(new JoinFunction<Tuple3<String, String, Integer>, Tuple3<String, String, Integer>, String>() {
-					@Override
-					public String join(Tuple3<String, String, Integer> first, Tuple3<String, String, Integer> second) throws Exception {
-						return first + ":" + second;
-					}
-				})
-				.addSink(new SinkFunction<String>() {
-					@Override
-					public void invoke(String value) throws Exception {
-						testResults.add(value);
-					}
-				});
-
-		env.execute("Self-Join Test");
-
-		List<String> expectedResult = Lists.newArrayList(
-				"(a,x,0):(a,x,0)",
-				"(a,x,0):(a,y,1)",
-				"(a,x,0):(a,z,2)",
-				"(a,y,1):(a,x,0)",
-				"(a,y,1):(a,y,1)",
-				"(a,y,1):(a,z,2)",
-				"(a,z,2):(a,x,0)",
-				"(a,z,2):(a,y,1)",
-				"(a,z,2):(a,z,2)",
-				"(b,u,3):(b,u,3)",
-				"(b,u,3):(b,w,5)",
-				"(b,w,5):(b,u,3)",
-				"(b,w,5):(b,w,5)",
-				"(a,i,6):(a,i,6)",
-				"(a,i,6):(a,j,7)",
-				"(a,i,6):(a,k,8)",
-				"(a,j,7):(a,i,6)",
-				"(a,j,7):(a,j,7)",
-				"(a,j,7):(a,k,8)",
-				"(a,k,8):(a,i,6)",
-				"(a,k,8):(a,j,7)",
-				"(a,k,8):(a,k,8)");
-
-		Collections.sort(expectedResult);
-		Collections.sort(testResults);
-
-		Assert.assertEquals(expectedResult, testResults);
-	}
-
-	private static class Tuple2TimestampExtractor implements TimestampExtractor<Tuple2<String, Integer>> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public long extractTimestamp(Tuple2<String, Integer> element, long currentTimestamp) {
-			return element.f1;
-		}
-
-		@Override
-		public long extractWatermark(Tuple2<String, Integer> element, long currentTimestamp) {
-			return element.f1 - 1;
-		}
-
-		@Override
-		public long getCurrentWatermark() {
-			return Long.MIN_VALUE;
-		}
-	}
-
-	private static class Tuple3TimestampExtractor implements TimestampExtractor<Tuple3<String, String, Integer>> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public long extractTimestamp(Tuple3<String, String, Integer> element, long currentTimestamp) {
-			return element.f2;
-		}
-
-		@Override
-		public long extractWatermark(Tuple3<String, String, Integer> element, long currentTimestamp) {
-			return element.f2 - 1;
-		}
-
-		@Override
-		public long getCurrentWatermark() {
-			return Long.MIN_VALUE;
-		}
-	}
-
-	private static class Tuple2KeyExtractor implements KeySelector<Tuple2<String,Integer>, String> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public String getKey(Tuple2<String, Integer> value) throws Exception {
-			return value.f0;
-		}
-	}
-
-	private static class Tuple3KeyExtractor implements KeySelector<Tuple3<String, String, Integer>, String> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public String getKey(Tuple3<String, String, Integer> value) throws Exception {
-			return value.f0;
-		}
-	}
-
-}