You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/21 11:03:39 UTC
[23/51] [partial] flink git commit: [FLINK-2877] Move Streaming API
out of Staging package
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
deleted file mode 100644
index 0ddf272..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.operators;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
-import org.apache.flink.streaming.api.graph.StreamConfig;
-import org.apache.flink.streaming.api.operators.StreamMap;
-import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
-import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
-import org.apache.flink.streaming.runtime.tasks.StreamTask;
-
-import org.junit.Test;
-import org.junit.runner.RunWith;
-
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-import java.util.concurrent.atomic.AtomicReference;
-
-import static org.junit.Assert.*;
-
-/**
- * Tests for the timer service of {@link org.apache.flink.streaming.runtime.tasks.StreamTask}.
- */
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(ResultPartitionWriter.class)
-@SuppressWarnings("serial")
-public class StreamTaskTimerTest {
-
- @Test
- public void testOpenCloseAndTimestamps() throws Exception {
- final OneInputStreamTask<String, String> mapTask = new OneInputStreamTask<>();
-
- final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<>(mapTask, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
-
- StreamConfig streamConfig = testHarness.getStreamConfig();
-
- StreamMap<String, String> mapOperator = new StreamMap<>(new DummyMapFunction<String>());
- streamConfig.setStreamOperator(mapOperator);
-
- testHarness.invoke();
-
- // first one spawns thread
- mapTask.registerTimer(System.currentTimeMillis(), new Triggerable() {
- @Override
- public void trigger(long timestamp) {}
- });
-
- assertEquals(1, StreamTask.TRIGGER_THREAD_GROUP.activeCount());
-
-
- testHarness.endInput();
- testHarness.waitForTaskCompletion();
-
- // thread needs to die in time
- long deadline = System.currentTimeMillis() + 4000;
- while (StreamTask.TRIGGER_THREAD_GROUP.activeCount() > 0 && System.currentTimeMillis() < deadline) {
- Thread.sleep(10);
- }
-
- assertEquals("Trigger timer thread did not properly shut down",
- 0, StreamTask.TRIGGER_THREAD_GROUP.activeCount());
- }
-
- @Test
- public void checkScheduledTimestampe() {
- try {
- final OneInputStreamTask<String, String> mapTask = new OneInputStreamTask<>();
- final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<>(mapTask, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
-
- StreamConfig streamConfig = testHarness.getStreamConfig();
- StreamMap<String, String> mapOperator = new StreamMap<>(new DummyMapFunction<String>());
- streamConfig.setStreamOperator(mapOperator);
-
- testHarness.invoke();
-
- final AtomicReference<Throwable> errorRef = new AtomicReference<>();
-
- final long t1 = System.currentTimeMillis();
- final long t2 = System.currentTimeMillis() - 200;
- final long t3 = System.currentTimeMillis() + 100;
- final long t4 = System.currentTimeMillis() + 200;
-
- mapTask.registerTimer(t1, new ValidatingTriggerable(errorRef, t1, 0));
- mapTask.registerTimer(t2, new ValidatingTriggerable(errorRef, t2, 1));
- mapTask.registerTimer(t3, new ValidatingTriggerable(errorRef, t3, 2));
- mapTask.registerTimer(t4, new ValidatingTriggerable(errorRef, t4, 3));
-
- long deadline = System.currentTimeMillis() + 20000;
- while (errorRef.get() == null &&
- ValidatingTriggerable.numInSequence < 4 &&
- System.currentTimeMillis() < deadline)
- {
- Thread.sleep(100);
- }
-
- // handle errors
- if (errorRef.get() != null) {
- errorRef.get().printStackTrace();
- fail(errorRef.get().getMessage());
- }
-
- assertEquals(4, ValidatingTriggerable.numInSequence);
-
- testHarness.endInput();
- testHarness.waitForTaskCompletion();
-
- // wait until the trigger thread is shut down. otherwise, the other tests may become unstable
- deadline = System.currentTimeMillis() + 4000;
- while (StreamTask.TRIGGER_THREAD_GROUP.activeCount() > 0 && System.currentTimeMillis() < deadline) {
- Thread.sleep(10);
- }
-
- assertEquals("Trigger timer thread did not properly shut down",
- 0, StreamTask.TRIGGER_THREAD_GROUP.activeCount());
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- private static class ValidatingTriggerable implements Triggerable {
-
- static int numInSequence;
-
- private final AtomicReference<Throwable> errorRef;
-
- private final long expectedTimestamp;
- private final int expectedInSequence;
-
- private ValidatingTriggerable(AtomicReference<Throwable> errorRef, long expectedTimestamp, int expectedInSequence) {
- this.errorRef = errorRef;
- this.expectedTimestamp = expectedTimestamp;
- this.expectedInSequence = expectedInSequence;
- }
-
- @Override
- public void trigger(long timestamp) {
- try {
- assertEquals(expectedTimestamp, timestamp);
- assertEquals(expectedInSequence, numInSequence);
- numInSequence++;
- }
- catch (Throwable t) {
- errorRef.compareAndSet(null, t);
- }
- }
- }
-
- // ------------------------------------------------------------------------
-
- public static class DummyMapFunction<T> implements MapFunction<T, T> {
- @Override
- public T map(T value) { return value; }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
deleted file mode 100644
index ad3c838..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
+++ /dev/null
@@ -1,824 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.operators.windowing;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.accumulators.Accumulator;
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.execution.Environment;
-import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
-import org.apache.flink.streaming.api.graph.StreamConfig;
-import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.runtime.state.StateBackend;
-import org.apache.flink.runtime.state.memory.MemoryStateBackend;
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-import org.apache.flink.streaming.runtime.operators.Triggerable;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.StreamTask;
-import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
-import org.apache.flink.util.Collector;
-
-import org.junit.After;
-import org.junit.Test;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-import org.mockito.stubbing.OngoingStubbing;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import static org.mockito.Mockito.*;
-import static org.junit.Assert.*;
-
-@SuppressWarnings({"serial", "SynchronizationOnLocalVariableOrMethodParameter"})
-public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
-
- @SuppressWarnings("unchecked")
- private final WindowFunction<String, String, String, TimeWindow> mockFunction = mock(WindowFunction.class);
-
- @SuppressWarnings("unchecked")
- private final KeySelector<String, String> mockKeySelector = mock(KeySelector.class);
-
- private final KeySelector<Integer, Integer> identitySelector = new KeySelector<Integer, Integer>() {
- @Override
- public Integer getKey(Integer value) {
- return value;
- }
- };
-
- private final WindowFunction<Integer, Integer, Integer, TimeWindow> validatingIdentityFunction =
- new WindowFunction<Integer, Integer, Integer, TimeWindow>()
- {
- @Override
- public void apply(Integer key,
- TimeWindow window,
- Iterable<Integer> values,
- Collector<Integer> out) {
- for (Integer val : values) {
- assertEquals(key, val);
- out.collect(val);
- }
- }
- };
-
- // ------------------------------------------------------------------------
-
- @After
- public void checkNoTriggerThreadsRunning() {
- // make sure that all the threads we trigger are shut down
- long deadline = System.currentTimeMillis() + 5000;
- while (StreamTask.TRIGGER_THREAD_GROUP.activeCount() > 0 && System.currentTimeMillis() < deadline) {
- try {
- Thread.sleep(10);
- }
- catch (InterruptedException ignored) {}
- }
-
- assertTrue("Not all trigger threads where properly shut down",
- StreamTask.TRIGGER_THREAD_GROUP.activeCount() == 0);
- }
-
- // ------------------------------------------------------------------------
-
- @Test
- public void testInvalidParameters() {
- try {
- assertInvalidParameter(-1L, -1L);
- assertInvalidParameter(10000L, -1L);
- assertInvalidParameter(-1L, 1000L);
- assertInvalidParameter(1000L, 2000L);
-
- // actual internal slide is too low here:
- assertInvalidParameter(1000L, 999L);
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testWindowSizeAndSlide() {
- try {
- AccumulatingProcessingTimeWindowOperator<String, String, String> op;
-
- op = new AccumulatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector,
- StringSerializer.INSTANCE, StringSerializer.INSTANCE, 5000, 1000);
- assertEquals(5000, op.getWindowSize());
- assertEquals(1000, op.getWindowSlide());
- assertEquals(1000, op.getPaneSize());
- assertEquals(5, op.getNumPanesPerWindow());
-
- op = new AccumulatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector,
- StringSerializer.INSTANCE, StringSerializer.INSTANCE, 1000, 1000);
- assertEquals(1000, op.getWindowSize());
- assertEquals(1000, op.getWindowSlide());
- assertEquals(1000, op.getPaneSize());
- assertEquals(1, op.getNumPanesPerWindow());
-
- op = new AccumulatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector,
- StringSerializer.INSTANCE, StringSerializer.INSTANCE, 1500, 1000);
- assertEquals(1500, op.getWindowSize());
- assertEquals(1000, op.getWindowSlide());
- assertEquals(500, op.getPaneSize());
- assertEquals(3, op.getNumPanesPerWindow());
-
- op = new AccumulatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector,
- StringSerializer.INSTANCE, StringSerializer.INSTANCE, 1200, 1100);
- assertEquals(1200, op.getWindowSize());
- assertEquals(1100, op.getWindowSlide());
- assertEquals(100, op.getPaneSize());
- assertEquals(12, op.getNumPanesPerWindow());
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testWindowTriggerTimeAlignment() {
- try {
- @SuppressWarnings("unchecked")
- final Output<StreamRecord<String>> mockOut = mock(Output.class);
- final StreamTask<?, ?> mockTask = createMockTask();
-
- AccumulatingProcessingTimeWindowOperator<String, String, String> op;
-
- op = new AccumulatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector,
- StringSerializer.INSTANCE, StringSerializer.INSTANCE, 5000, 1000);
- op.setup(mockTask, new StreamConfig(new Configuration()), mockOut);
- op.open();
- assertTrue(op.getNextSlideTime() % 1000 == 0);
- assertTrue(op.getNextEvaluationTime() % 1000 == 0);
- op.dispose();
-
- op = new AccumulatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector,
- StringSerializer.INSTANCE, StringSerializer.INSTANCE, 1000, 1000);
- op.setup(mockTask, new StreamConfig(new Configuration()), mockOut);
- op.open();
- assertTrue(op.getNextSlideTime() % 1000 == 0);
- assertTrue(op.getNextEvaluationTime() % 1000 == 0);
- op.dispose();
-
- op = new AccumulatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector,
- StringSerializer.INSTANCE, StringSerializer.INSTANCE, 1500, 1000);
- op.setup(mockTask, new StreamConfig(new Configuration()), mockOut);
- op.open();
- assertTrue(op.getNextSlideTime() % 500 == 0);
- assertTrue(op.getNextEvaluationTime() % 1000 == 0);
- op.dispose();
-
- op = new AccumulatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector,
- StringSerializer.INSTANCE, StringSerializer.INSTANCE, 1200, 1100);
- op.setup(mockTask, new StreamConfig(new Configuration()), mockOut);
- op.open();
- assertTrue(op.getNextSlideTime() % 100 == 0);
- assertTrue(op.getNextEvaluationTime() % 1100 == 0);
- op.dispose();
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testTumblingWindow() {
- final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
- try {
- final int windowSize = 50;
- final CollectingOutput<Integer> out = new CollectingOutput<>(windowSize);
- final Object lock = new Object();
- final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
-
- // tumbling window that triggers every 20 milliseconds
- AccumulatingProcessingTimeWindowOperator<Integer, Integer, Integer> op =
- new AccumulatingProcessingTimeWindowOperator<>(
- validatingIdentityFunction, identitySelector,
- IntSerializer.INSTANCE, IntSerializer.INSTANCE,
- windowSize, windowSize);
-
- op.setup(mockTask, new StreamConfig(new Configuration()), out);
- op.open();
-
- final int numElements = 1000;
-
- for (int i = 0; i < numElements; i++) {
- synchronized (lock) {
- op.processElement(new StreamRecord<Integer>(i));
- }
- Thread.sleep(1);
- }
-
- synchronized (lock) {
- op.close();
- }
- op.dispose();
-
- // get and verify the result
- List<Integer> result = out.getElements();
- assertEquals(numElements, result.size());
-
- Collections.sort(result);
- for (int i = 0; i < numElements; i++) {
- assertEquals(i, result.get(i).intValue());
- }
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- finally {
- timerService.shutdown();
- }
- }
-
- @Test
- public void testSlidingWindow() {
- final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
- try {
- final CollectingOutput<Integer> out = new CollectingOutput<>(50);
- final Object lock = new Object();
- final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
-
- // tumbling window that triggers every 20 milliseconds
- AccumulatingProcessingTimeWindowOperator<Integer, Integer, Integer> op =
- new AccumulatingProcessingTimeWindowOperator<>(
- validatingIdentityFunction, identitySelector,
- IntSerializer.INSTANCE, IntSerializer.INSTANCE, 150, 50);
-
- op.setup(mockTask, new StreamConfig(new Configuration()), out);
- op.open();
-
- final int numElements = 1000;
-
- for (int i = 0; i < numElements; i++) {
- synchronized (lock) {
- op.processElement(new StreamRecord<Integer>(i));
- }
- Thread.sleep(1);
- }
-
- synchronized (lock) {
- op.close();
- }
- op.dispose();
-
- // get and verify the result
- List<Integer> result = out.getElements();
-
- // if we kept this running, each element would be in the result three times (for each slide).
- // we are closing the window before the final panes are through three times, so we may have less
- // elements.
- if (result.size() < numElements || result.size() > 3 * numElements) {
- fail("Wrong number of results: " + result.size());
- }
-
- Collections.sort(result);
- int lastNum = -1;
- int lastCount = -1;
-
- for (int num : result) {
- if (num == lastNum) {
- lastCount++;
- assertTrue(lastCount <= 3);
- }
- else {
- lastNum = num;
- lastCount = 1;
- }
- }
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- finally {
- timerService.shutdown();
- }
- }
-
- @Test
- public void testTumblingWindowSingleElements() {
- final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
-
- try {
- final CollectingOutput<Integer> out = new CollectingOutput<>(50);
- final Object lock = new Object();
- final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
-
- // tumbling window that triggers every 20 milliseconds
- AccumulatingProcessingTimeWindowOperator<Integer, Integer, Integer> op =
- new AccumulatingProcessingTimeWindowOperator<>(
- validatingIdentityFunction, identitySelector,
- IntSerializer.INSTANCE, IntSerializer.INSTANCE, 50, 50);
-
- op.setup(mockTask, new StreamConfig(new Configuration()), out);
- op.open();
-
- synchronized (lock) {
- op.processElement(new StreamRecord<Integer>(1));
- op.processElement(new StreamRecord<Integer>(2));
- }
- out.waitForNElements(2, 60000);
-
- synchronized (lock) {
- op.processElement(new StreamRecord<Integer>(3));
- op.processElement(new StreamRecord<Integer>(4));
- op.processElement(new StreamRecord<Integer>(5));
- }
- out.waitForNElements(5, 60000);
-
- synchronized (lock) {
- op.processElement(new StreamRecord<Integer>(6));
- }
- out.waitForNElements(6, 60000);
-
- List<Integer> result = out.getElements();
- assertEquals(6, result.size());
-
- Collections.sort(result);
- assertEquals(Arrays.asList(1, 2, 3, 4, 5, 6), result);
-
- synchronized (lock) {
- op.close();
- }
- op.dispose();
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- finally {
- timerService.shutdown();
- }
- }
-
- @Test
- public void testSlidingWindowSingleElements() {
- final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
-
- try {
- final CollectingOutput<Integer> out = new CollectingOutput<>(50);
- final Object lock = new Object();
- final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
-
- // tumbling window that triggers every 20 milliseconds
- AccumulatingProcessingTimeWindowOperator<Integer, Integer, Integer> op =
- new AccumulatingProcessingTimeWindowOperator<>(
- validatingIdentityFunction, identitySelector,
- IntSerializer.INSTANCE, IntSerializer.INSTANCE, 150, 50);
-
- op.setup(mockTask, new StreamConfig(new Configuration()), out);
- op.open();
-
- synchronized (lock) {
- op.processElement(new StreamRecord<Integer>(1));
- op.processElement(new StreamRecord<Integer>(2));
- }
-
- // each element should end up in the output three times
- // wait until the elements have arrived 6 times in the output
- out.waitForNElements(6, 120000);
-
- List<Integer> result = out.getElements();
- assertEquals(6, result.size());
-
- Collections.sort(result);
- assertEquals(Arrays.asList(1, 1, 1, 2, 2, 2), result);
-
- synchronized (lock) {
- op.close();
- }
- op.dispose();
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- finally {
- timerService.shutdown();
- }
- }
-
- @Test
- public void testEmitTrailingDataOnClose() {
- final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
- try {
- final CollectingOutput<Integer> out = new CollectingOutput<>();
- final Object lock = new Object();
- final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
-
- // the operator has a window time that is so long that it will not fire in this test
- final long oneYear = 365L * 24 * 60 * 60 * 1000;
- AccumulatingProcessingTimeWindowOperator<Integer, Integer, Integer> op =
- new AccumulatingProcessingTimeWindowOperator<>(
- validatingIdentityFunction, identitySelector,
- IntSerializer.INSTANCE, IntSerializer.INSTANCE,
- oneYear, oneYear);
-
- op.setup(mockTask, new StreamConfig(new Configuration()), out);
- op.open();
-
- List<Integer> data = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
- for (Integer i : data) {
- synchronized (lock) {
- op.processElement(new StreamRecord<Integer>(i));
- }
- }
-
- synchronized (lock) {
- op.close();
- }
- op.dispose();
-
- // get and verify the result
- List<Integer> result = out.getElements();
- Collections.sort(result);
- assertEquals(data, result);
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- finally {
- timerService.shutdown();
- }
- }
-
- @Test
- public void testPropagateExceptionsFromClose() {
- final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
- try {
- final CollectingOutput<Integer> out = new CollectingOutput<>();
- final Object lock = new Object();
- final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
-
- WindowFunction<Integer, Integer, Integer, TimeWindow> failingFunction = new FailingFunction(100);
-
- // the operator has a window time that is so long that it will not fire in this test
- final long hundredYears = 100L * 365 * 24 * 60 * 60 * 1000;
- AccumulatingProcessingTimeWindowOperator<Integer, Integer, Integer> op =
- new AccumulatingProcessingTimeWindowOperator<>(
- failingFunction, identitySelector,
- IntSerializer.INSTANCE, IntSerializer.INSTANCE,
- hundredYears, hundredYears);
-
- op.setup(mockTask, new StreamConfig(new Configuration()), out);
- op.open();
-
- for (int i = 0; i < 150; i++) {
- synchronized (lock) {
- op.processElement(new StreamRecord<Integer>(i));
- }
- }
-
- try {
- synchronized (lock) {
- op.close();
- }
- fail("This should fail with an exception");
- }
- catch (Exception e) {
- assertTrue(
- e.getMessage().contains("Artificial Test Exception") ||
- (e.getCause() != null && e.getCause().getMessage().contains("Artificial Test Exception")));
- }
-
- op.dispose();
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- finally {
- timerService.shutdown();
- }
- }
-
- @Test
- public void checkpointRestoreWithPendingWindowTumbling() {
- final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
- try {
- final int windowSize = 200;
- final CollectingOutput<Integer> out = new CollectingOutput<>(windowSize);
- final Object lock = new Object();
- final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
-
- // tumbling window that triggers every 50 milliseconds
- AccumulatingProcessingTimeWindowOperator<Integer, Integer, Integer> op =
- new AccumulatingProcessingTimeWindowOperator<>(
- validatingIdentityFunction, identitySelector,
- IntSerializer.INSTANCE, IntSerializer.INSTANCE,
- windowSize, windowSize);
-
- op.setup(mockTask, new StreamConfig(new Configuration()), out);
- op.open();
-
- // inject some elements
- final int numElementsFirst = 700;
- for (int i = 0; i < numElementsFirst; i++) {
- synchronized (lock) {
- op.processElement(new StreamRecord<Integer>(i));
- }
- Thread.sleep(1);
- }
-
- // draw a snapshot and dispose the window
- StreamTaskState state;
- List<Integer> resultAtSnapshot;
- synchronized (lock) {
- int beforeSnapShot = out.getElements().size();
- state = op.snapshotOperatorState(1L, System.currentTimeMillis());
- resultAtSnapshot = new ArrayList<>(out.getElements());
- int afterSnapShot = out.getElements().size();
- assertEquals("operator performed computation during snapshot", beforeSnapShot, afterSnapShot);
- }
-
- // inject some random elements, which should not show up in the state
- for (int i = 0; i < 300; i++) {
- synchronized (lock) {
- op.processElement(new StreamRecord<Integer>(i + numElementsFirst));
- }
- Thread.sleep(1);
- }
-
- op.dispose();
-
- // re-create the operator and restore the state
- final CollectingOutput<Integer> out2 = new CollectingOutput<>(windowSize);
- op = new AccumulatingProcessingTimeWindowOperator<>(
- validatingIdentityFunction, identitySelector,
- IntSerializer.INSTANCE, IntSerializer.INSTANCE,
- windowSize, windowSize);
-
- op.setup(mockTask, new StreamConfig(new Configuration()), out2);
- op.restoreState(state);
- op.open();
-
- // inject some more elements
- final int numElements = 1000;
- for (int i = numElementsFirst; i < numElements; i++) {
- synchronized (lock) {
- op.processElement(new StreamRecord<Integer>(i));
- }
- Thread.sleep(1);
- }
-
- synchronized (lock) {
- op.close();
- }
- op.dispose();
-
- // get and verify the result
- List<Integer> finalResult = new ArrayList<>(resultAtSnapshot);
- finalResult.addAll(out2.getElements());
- assertEquals(numElements, finalResult.size());
-
- Collections.sort(finalResult);
- for (int i = 0; i < numElements; i++) {
- assertEquals(i, finalResult.get(i).intValue());
- }
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- finally {
- timerService.shutdown();
- }
- }
-
- @Test
- public void checkpointRestoreWithPendingWindowSliding() {
- final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
- try {
- final int factor = 4;
- final int windowSlide = 50;
- final int windowSize = factor * windowSlide;
-
- final CollectingOutput<Integer> out = new CollectingOutput<>(windowSlide);
- final Object lock = new Object();
- final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
-
- // sliding window (200 msecs) every 50 msecs
- AccumulatingProcessingTimeWindowOperator<Integer, Integer, Integer> op =
- new AccumulatingProcessingTimeWindowOperator<>(
- validatingIdentityFunction, identitySelector,
- IntSerializer.INSTANCE, IntSerializer.INSTANCE,
- windowSize, windowSlide);
-
- op.setup(mockTask, new StreamConfig(new Configuration()), out);
- op.open();
-
- // inject some elements
- final int numElements = 1000;
- final int numElementsFirst = 700;
-
- for (int i = 0; i < numElementsFirst; i++) {
- synchronized (lock) {
- op.processElement(new StreamRecord<Integer>(i));
- }
- Thread.sleep(1);
- }
-
- // draw a snapshot
- StreamTaskState state;
- List<Integer> resultAtSnapshot;
- synchronized (lock) {
- int beforeSnapShot = out.getElements().size();
- state = op.snapshotOperatorState(1L, System.currentTimeMillis());
- resultAtSnapshot = new ArrayList<>(out.getElements());
- int afterSnapShot = out.getElements().size();
- assertEquals("operator performed computation during snapshot", beforeSnapShot, afterSnapShot);
- }
-
- assertTrue(resultAtSnapshot.size() <= factor * numElementsFirst);
-
- // inject the remaining elements - these should not influence the snapshot
- for (int i = numElementsFirst; i < numElements; i++) {
- synchronized (lock) {
- op.processElement(new StreamRecord<Integer>(i));
- }
- Thread.sleep(1);
- }
-
- op.dispose();
-
- // re-create the operator and restore the state
- final CollectingOutput<Integer> out2 = new CollectingOutput<>(windowSlide);
- op = new AccumulatingProcessingTimeWindowOperator<>(
- validatingIdentityFunction, identitySelector,
- IntSerializer.INSTANCE, IntSerializer.INSTANCE,
- windowSize, windowSlide);
-
- op.setup(mockTask, new StreamConfig(new Configuration()), out2);
- op.restoreState(state);
- op.open();
-
-
- // inject again the remaining elements
- for (int i = numElementsFirst; i < numElements; i++) {
- synchronized (lock) {
- op.processElement(new StreamRecord<Integer>(i));
- }
- Thread.sleep(1);
- }
-
- // for a deterministic result, we need to wait until all pending triggers
- // have fired and emitted their results
- long deadline = System.currentTimeMillis() + 120000;
- do {
- Thread.sleep(20);
- }
- while (resultAtSnapshot.size() + out2.getElements().size() < factor * numElements
- && System.currentTimeMillis() < deadline);
-
- synchronized (lock) {
- op.close();
- }
- op.dispose();
-
- // get and verify the result
- List<Integer> finalResult = new ArrayList<>(resultAtSnapshot);
- finalResult.addAll(out2.getElements());
- assertEquals(factor * numElements, finalResult.size());
-
- Collections.sort(finalResult);
- for (int i = 0; i < factor * numElements; i++) {
- assertEquals(i / factor, finalResult.get(i).intValue());
- }
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- finally {
- timerService.shutdown();
- }
- }
-
- // ------------------------------------------------------------------------
-
- private void assertInvalidParameter(long windowSize, long windowSlide) {
- try {
- new AccumulatingProcessingTimeWindowOperator<String, String, String>(
- mockFunction, mockKeySelector,
- StringSerializer.INSTANCE, StringSerializer.INSTANCE,
- windowSize, windowSlide);
- fail("This should fail with an IllegalArgumentException");
- }
- catch (IllegalArgumentException e) {
- // expected
- }
- catch (Exception e) {
- fail("Wrong exception. Expected IllegalArgumentException but found " + e.getClass().getSimpleName());
- }
- }
-
- // ------------------------------------------------------------------------
-
- private static class FailingFunction implements WindowFunction<Integer, Integer, Integer, TimeWindow> {
-
- private final int failAfterElements;
-
- private int numElements;
-
- FailingFunction(int failAfterElements) {
- this.failAfterElements = failAfterElements;
- }
-
- @Override
- public void apply(Integer integer,
- TimeWindow window,
- Iterable<Integer> values,
- Collector<Integer> out) throws Exception {
- for (Integer i : values) {
- out.collect(i);
- numElements++;
-
- if (numElements >= failAfterElements) {
- throw new Exception("Artificial Test Exception");
- }
- }
- }
- }
-
- private static StreamTask<?, ?> createMockTask() {
- StreamTask<?, ?> task = mock(StreamTask.class);
- when(task.getAccumulatorMap()).thenReturn(new HashMap<String, Accumulator<?, ?>>());
- when(task.getName()).thenReturn("Test task name");
- when(task.getExecutionConfig()).thenReturn(new ExecutionConfig());
-
- Environment env = mock(Environment.class);
- when(env.getIndexInSubtaskGroup()).thenReturn(0);
- when(env.getNumberOfSubtasks()).thenReturn(1);
- when(env.getUserClassLoader()).thenReturn(AggregatingAlignedProcessingTimeWindowOperatorTest.class.getClassLoader());
-
- when(task.getEnvironment()).thenReturn(env);
-
- // ugly java generic hacks to get the state backend into the mock
- @SuppressWarnings("unchecked")
- OngoingStubbing<StateBackend<?>> stubbing =
- (OngoingStubbing<StateBackend<?>>) (OngoingStubbing<?>) when(task.getStateBackend());
- stubbing.thenReturn(MemoryStateBackend.defaultInstance());
-
- return task;
- }
-
- private static StreamTask<?, ?> createMockTaskWithTimer(
- final ScheduledExecutorService timerService, final Object lock)
- {
- StreamTask<?, ?> mockTask = createMockTask();
-
- doAnswer(new Answer<Void>() {
- @Override
- public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
- final Long timestamp = (Long) invocationOnMock.getArguments()[0];
- final Triggerable target = (Triggerable) invocationOnMock.getArguments()[1];
- timerService.schedule(
- new Callable<Object>() {
- @Override
- public Object call() throws Exception {
- synchronized (lock) {
- target.trigger(timestamp);
- }
- return null;
- }
- },
- timestamp - System.currentTimeMillis(),
- TimeUnit.MILLISECONDS);
- return null;
- }
- }).when(mockTask).registerTimer(anyLong(), any(Triggerable.class));
-
- return mockTask;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
deleted file mode 100644
index 4bd260f..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
+++ /dev/null
@@ -1,823 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.operators.windowing;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.accumulators.Accumulator;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.execution.Environment;
-import org.apache.flink.streaming.api.graph.StreamConfig;
-import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.runtime.state.StateBackend;
-import org.apache.flink.runtime.state.memory.MemoryStateBackend;
-import org.apache.flink.streaming.runtime.operators.Triggerable;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.StreamTask;
-
-import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
-import org.junit.After;
-import org.junit.Test;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-import org.mockito.stubbing.OngoingStubbing;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyLong;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-@SuppressWarnings({"serial", "SynchronizationOnLocalVariableOrMethodParameter"})
-public class AggregatingAlignedProcessingTimeWindowOperatorTest {
-
- @SuppressWarnings("unchecked")
- private final ReduceFunction<String> mockFunction = mock(ReduceFunction.class);
-
- @SuppressWarnings("unchecked")
- private final KeySelector<String, String> mockKeySelector = mock(KeySelector.class);
-
- private final KeySelector<Integer, Integer> identitySelector = new KeySelector<Integer, Integer>() {
- @Override
- public Integer getKey(Integer value) {
- return value;
- }
- };
-
- private final ReduceFunction<Integer> sumFunction = new ReduceFunction<Integer>() {
- @Override
- public Integer reduce(Integer value1, Integer value2) {
- return value1 + value2;
- }
- };
-
- // ------------------------------------------------------------------------
-
- @After
- public void checkNoTriggerThreadsRunning() {
- // make sure that all the threads we trigger are shut down
- long deadline = System.currentTimeMillis() + 5000;
- while (StreamTask.TRIGGER_THREAD_GROUP.activeCount() > 0 && System.currentTimeMillis() < deadline) {
- try {
- Thread.sleep(10);
- }
- catch (InterruptedException ignored) {}
- }
-
- assertTrue("Not all trigger threads where properly shut down",
- StreamTask.TRIGGER_THREAD_GROUP.activeCount() == 0);
- }
-
- // ------------------------------------------------------------------------
-
- @Test
- public void testInvalidParameters() {
- try {
- assertInvalidParameter(-1L, -1L);
- assertInvalidParameter(10000L, -1L);
- assertInvalidParameter(-1L, 1000L);
- assertInvalidParameter(1000L, 2000L);
-
- // actual internal slide is too low here:
- assertInvalidParameter(1000L, 999L);
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testWindowSizeAndSlide() {
- try {
- AggregatingProcessingTimeWindowOperator<String, String> op;
-
- op = new AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector,
- StringSerializer.INSTANCE, StringSerializer.INSTANCE, 5000, 1000);
- assertEquals(5000, op.getWindowSize());
- assertEquals(1000, op.getWindowSlide());
- assertEquals(1000, op.getPaneSize());
- assertEquals(5, op.getNumPanesPerWindow());
-
- op = new AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector,
- StringSerializer.INSTANCE, StringSerializer.INSTANCE, 1000, 1000);
- assertEquals(1000, op.getWindowSize());
- assertEquals(1000, op.getWindowSlide());
- assertEquals(1000, op.getPaneSize());
- assertEquals(1, op.getNumPanesPerWindow());
-
- op = new AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector,
- StringSerializer.INSTANCE, StringSerializer.INSTANCE, 1500, 1000);
- assertEquals(1500, op.getWindowSize());
- assertEquals(1000, op.getWindowSlide());
- assertEquals(500, op.getPaneSize());
- assertEquals(3, op.getNumPanesPerWindow());
-
- op = new AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector,
- StringSerializer.INSTANCE, StringSerializer.INSTANCE, 1200, 1100);
- assertEquals(1200, op.getWindowSize());
- assertEquals(1100, op.getWindowSlide());
- assertEquals(100, op.getPaneSize());
- assertEquals(12, op.getNumPanesPerWindow());
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testWindowTriggerTimeAlignment() {
- try {
- @SuppressWarnings("unchecked")
- final Output<StreamRecord<String>> mockOut = mock(Output.class);
- final StreamTask<?, ?> mockTask = createMockTask();
-
- AggregatingProcessingTimeWindowOperator<String, String> op;
-
- op = new AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector,
- StringSerializer.INSTANCE, StringSerializer.INSTANCE, 5000, 1000);
- op.setup(mockTask, new StreamConfig(new Configuration()), mockOut);
- op.open();
- assertTrue(op.getNextSlideTime() % 1000 == 0);
- assertTrue(op.getNextEvaluationTime() % 1000 == 0);
- op.dispose();
-
- op = new AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector,
- StringSerializer.INSTANCE, StringSerializer.INSTANCE, 1000, 1000);
- op.setup(mockTask, new StreamConfig(new Configuration()), mockOut);
- op.open();
- assertTrue(op.getNextSlideTime() % 1000 == 0);
- assertTrue(op.getNextEvaluationTime() % 1000 == 0);
- op.dispose();
-
- op = new AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector,
- StringSerializer.INSTANCE, StringSerializer.INSTANCE, 1500, 1000);
- op.setup(mockTask, new StreamConfig(new Configuration()), mockOut);
- op.open();
- assertTrue(op.getNextSlideTime() % 500 == 0);
- assertTrue(op.getNextEvaluationTime() % 1000 == 0);
- op.dispose();
-
- op = new AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector,
- StringSerializer.INSTANCE, StringSerializer.INSTANCE, 1200, 1100);
- op.setup(mockTask, new StreamConfig(new Configuration()), mockOut);
- op.open();
- assertTrue(op.getNextSlideTime() % 100 == 0);
- assertTrue(op.getNextEvaluationTime() % 1100 == 0);
- op.dispose();
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testTumblingWindowUniqueElements() {
- final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
- try {
- final int windowSize = 50;
- final CollectingOutput<Integer> out = new CollectingOutput<>(windowSize);
-
- AggregatingProcessingTimeWindowOperator<Integer, Integer> op =
- new AggregatingProcessingTimeWindowOperator<>(
- sumFunction, identitySelector,
- IntSerializer.INSTANCE, IntSerializer.INSTANCE,
- windowSize, windowSize);
-
- final Object lock = new Object();
- final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
-
- op.setup(mockTask, new StreamConfig(new Configuration()), out);
- op.open();
-
- final int numElements = 1000;
-
- for (int i = 0; i < numElements; i++) {
- synchronized (lock) {
- op.processElement(new StreamRecord<Integer>(i));
- }
- Thread.sleep(1);
- }
-
- synchronized (lock) {
- op.close();
- }
- op.dispose();
-
- // get and verify the result
- List<Integer> result = out.getElements();
- assertEquals(numElements, result.size());
-
- Collections.sort(result);
- for (int i = 0; i < numElements; i++) {
- assertEquals(i, result.get(i).intValue());
- }
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- finally {
- timerService.shutdownNow();
- }
- }
-
- @Test
- public void testTumblingWindowDuplicateElements() {
- final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
-
- try {
- final int windowSize = 50;
- final CollectingOutput<Integer> out = new CollectingOutput<>(windowSize);
-
- final Object lock = new Object();
- final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
-
- AggregatingProcessingTimeWindowOperator<Integer, Integer> op =
- new AggregatingProcessingTimeWindowOperator<>(
- sumFunction, identitySelector,
- IntSerializer.INSTANCE, IntSerializer.INSTANCE,
- windowSize, windowSize);
-
- op.setup(mockTask, new StreamConfig(new Configuration()), out);
- op.open();
-
- final int numWindows = 10;
-
- long previousNextTime = 0;
- int window = 1;
-
- while (window <= numWindows) {
- synchronized (lock) {
- long nextTime = op.getNextEvaluationTime();
- int val = ((int) nextTime) ^ ((int) (nextTime >>> 32));
-
- op.processElement(new StreamRecord<Integer>(val));
-
- if (nextTime != previousNextTime) {
- window++;
- previousNextTime = nextTime;
- }
- }
- Thread.sleep(1);
- }
-
- synchronized (lock) {
- op.close();
- }
- op.dispose();
-
- List<Integer> result = out.getElements();
-
- // we have ideally one element per window. we may have more, when we emitted a value into the
- // successive window (corner case), so we can have twice the number of elements, in the worst case.
- assertTrue(result.size() >= numWindows && result.size() <= 2 * numWindows);
-
- // deduplicate for more accurate checks
- HashSet<Integer> set = new HashSet<>(result);
- assertTrue(set.size() == 10);
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- finally {
- timerService.shutdown();
- }
- }
-
- @Test
- public void testSlidingWindow() {
- final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
- try {
- final CollectingOutput<Integer> out = new CollectingOutput<>(50);
-
- final Object lock = new Object();
- final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
-
- // tumbling window that triggers every 20 milliseconds
- AggregatingProcessingTimeWindowOperator<Integer, Integer> op =
- new AggregatingProcessingTimeWindowOperator<>(
- sumFunction, identitySelector,
- IntSerializer.INSTANCE, IntSerializer.INSTANCE,
- 150, 50);
-
- op.setup(mockTask, new StreamConfig(new Configuration()), out);
- op.open();
-
- final int numElements = 1000;
-
- for (int i = 0; i < numElements; i++) {
- synchronized (lock) {
- op.processElement(new StreamRecord<Integer>(i));
- }
- Thread.sleep(1);
- }
-
- synchronized (lock) {
- op.close();
- }
- op.dispose();
-
- // get and verify the result
- List<Integer> result = out.getElements();
-
- // every element can occur between one and three times
- if (result.size() < numElements || result.size() > 3 * numElements) {
- System.out.println(result);
- fail("Wrong number of results: " + result.size());
- }
-
- Collections.sort(result);
- int lastNum = -1;
- int lastCount = -1;
-
- for (int num : result) {
- if (num == lastNum) {
- lastCount++;
- assertTrue(lastCount <= 3);
- }
- else {
- lastNum = num;
- lastCount = 1;
- }
- }
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- finally {
- timerService.shutdownNow();
- }
- }
-
- @Test
- public void testSlidingWindowSingleElements() {
- final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
-
- try {
- final CollectingOutput<Integer> out = new CollectingOutput<>(50);
- final Object lock = new Object();
- final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
-
- // tumbling window that triggers every 20 milliseconds
- AggregatingProcessingTimeWindowOperator<Integer, Integer> op =
- new AggregatingProcessingTimeWindowOperator<>(
- sumFunction, identitySelector,
- IntSerializer.INSTANCE, IntSerializer.INSTANCE, 150, 50);
-
- op.setup(mockTask, new StreamConfig(new Configuration()), out);
- op.open();
-
- synchronized (lock) {
- op.processElement(new StreamRecord<Integer>(1));
- op.processElement(new StreamRecord<Integer>(2));
- }
-
- // each element should end up in the output three times
- // wait until the elements have arrived 6 times in the output
- out.waitForNElements(6, 120000);
-
- List<Integer> result = out.getElements();
- assertEquals(6, result.size());
-
- Collections.sort(result);
- assertEquals(Arrays.asList(1, 1, 1, 2, 2, 2), result);
-
- synchronized (lock) {
- op.close();
- }
- op.dispose();
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- finally {
- timerService.shutdown();
- }
- }
-
- @Test
- public void testEmitTrailingDataOnClose() {
- final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
- try {
- final CollectingOutput<Integer> out = new CollectingOutput<>();
- final Object lock = new Object();
- final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
-
- // the operator has a window time that is so long that it will not fire in this test
- final long oneYear = 365L * 24 * 60 * 60 * 1000;
- AggregatingProcessingTimeWindowOperator<Integer, Integer> op =
- new AggregatingProcessingTimeWindowOperator<>(sumFunction, identitySelector,
- IntSerializer.INSTANCE, IntSerializer.INSTANCE, oneYear, oneYear);
-
- op.setup(mockTask, new StreamConfig(new Configuration()), out);
- op.open();
-
- List<Integer> data = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
- for (Integer i : data) {
- synchronized (lock) {
- op.processElement(new StreamRecord<Integer>(i));
- }
- }
-
- synchronized (lock) {
- op.close();
- }
- op.dispose();
-
- // get and verify the result
- List<Integer> result = out.getElements();
- Collections.sort(result);
- assertEquals(data, result);
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- finally {
- timerService.shutdown();
- }
- }
-
- @Test
- public void testPropagateExceptionsFromProcessElement() {
- final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
- try {
- final CollectingOutput<Integer> out = new CollectingOutput<>();
- final Object lock = new Object();
- final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
-
- ReduceFunction<Integer> failingFunction = new FailingFunction(100);
-
- // the operator has a window time that is so long that it will not fire in this test
- final long hundredYears = 100L * 365 * 24 * 60 * 60 * 1000;
- AggregatingProcessingTimeWindowOperator<Integer, Integer> op =
- new AggregatingProcessingTimeWindowOperator<>(
- failingFunction, identitySelector,
- IntSerializer.INSTANCE, IntSerializer.INSTANCE,
- hundredYears, hundredYears);
-
- op.setup(mockTask, new StreamConfig(new Configuration()), out);
- op.open();
-
- for (int i = 0; i < 100; i++) {
- synchronized (lock) {
- op.processElement(new StreamRecord<Integer>(1));
- }
- }
-
- try {
- op.processElement(new StreamRecord<Integer>(1));
- fail("This fail with an exception");
- }
- catch (Exception e) {
- assertTrue(e.getMessage().contains("Artificial Test Exception"));
- }
-
- op.dispose();
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- finally {
- timerService.shutdown();
- }
- }
-
- @Test
- public void checkpointRestoreWithPendingWindowTumbling() {
- final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
- try {
- final int windowSize = 200;
- final CollectingOutput<Integer> out = new CollectingOutput<>(windowSize);
- final Object lock = new Object();
- final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
-
- // tumbling window that triggers every 50 milliseconds
- AggregatingProcessingTimeWindowOperator<Integer, Integer> op =
- new AggregatingProcessingTimeWindowOperator<>(
- sumFunction, identitySelector,
- IntSerializer.INSTANCE, IntSerializer.INSTANCE,
- windowSize, windowSize);
-
- op.setup(mockTask, new StreamConfig(new Configuration()), out);
- op.open();
-
- // inject some elements
- final int numElementsFirst = 700;
- final int numElements = 1000;
-
- for (int i = 0; i < numElementsFirst; i++) {
- synchronized (lock) {
- op.processElement(new StreamRecord<Integer>(i));
- }
- Thread.sleep(1);
- }
-
- // draw a snapshot and dispose the window
- StreamTaskState state;
- List<Integer> resultAtSnapshot;
- synchronized (lock) {
- int beforeSnapShot = out.getElements().size();
- state = op.snapshotOperatorState(1L, System.currentTimeMillis());
- resultAtSnapshot = new ArrayList<>(out.getElements());
- int afterSnapShot = out.getElements().size();
- assertEquals("operator performed computation during snapshot", beforeSnapShot, afterSnapShot);
- }
-
- assertTrue(resultAtSnapshot.size() <= numElementsFirst);
-
- // inject some random elements, which should not show up in the state
- for (int i = numElementsFirst; i < numElements; i++) {
- synchronized (lock) {
- op.processElement(new StreamRecord<Integer>(i));
- }
- Thread.sleep(1);
- }
-
- op.dispose();
-
- // re-create the operator and restore the state
- final CollectingOutput<Integer> out2 = new CollectingOutput<>(windowSize);
- op = new AggregatingProcessingTimeWindowOperator<>(
- sumFunction, identitySelector,
- IntSerializer.INSTANCE, IntSerializer.INSTANCE,
- windowSize, windowSize);
-
- op.setup(mockTask, new StreamConfig(new Configuration()), out2);
- op.restoreState(state);
- op.open();
-
- // inject the remaining elements
- for (int i = numElementsFirst; i < numElements; i++) {
- synchronized (lock) {
- op.processElement(new StreamRecord<Integer>(i));
- }
- Thread.sleep(1);
- }
-
- synchronized (lock) {
- op.close();
- }
- op.dispose();
-
- // get and verify the result
- List<Integer> finalResult = new ArrayList<>(resultAtSnapshot);
- finalResult.addAll(out2.getElements());
- assertEquals(numElements, finalResult.size());
-
- Collections.sort(finalResult);
- for (int i = 0; i < numElements; i++) {
- assertEquals(i, finalResult.get(i).intValue());
- }
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- finally {
- timerService.shutdown();
- }
- }
-
- @Test
- public void checkpointRestoreWithPendingWindowSliding() {
- final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
- try {
- final int factor = 4;
- final int windowSlide = 50;
- final int windowSize = factor * windowSlide;
-
- final CollectingOutput<Integer> out = new CollectingOutput<>(windowSlide);
- final Object lock = new Object();
- final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
-
- // sliding window (200 msecs) every 50 msecs
- AggregatingProcessingTimeWindowOperator<Integer, Integer> op =
- new AggregatingProcessingTimeWindowOperator<>(
- sumFunction, identitySelector,
- IntSerializer.INSTANCE, IntSerializer.INSTANCE,
- windowSize, windowSlide);
-
- op.setup(mockTask, new StreamConfig(new Configuration()), out);
- op.open();
-
- // inject some elements
- final int numElements = 1000;
- final int numElementsFirst = 700;
-
- for (int i = 0; i < numElementsFirst; i++) {
- synchronized (lock) {
- op.processElement(new StreamRecord<Integer>(i));
- }
- Thread.sleep(1);
- }
-
- // draw a snapshot
- StreamTaskState state;
- List<Integer> resultAtSnapshot;
- synchronized (lock) {
- int beforeSnapShot = out.getElements().size();
- state = op.snapshotOperatorState(1L, System.currentTimeMillis());
- resultAtSnapshot = new ArrayList<>(out.getElements());
- int afterSnapShot = out.getElements().size();
- assertEquals("operator performed computation during snapshot", beforeSnapShot, afterSnapShot);
- }
-
- assertTrue(resultAtSnapshot.size() <= factor * numElementsFirst);
-
- // inject the remaining elements - these should not influence the snapshot
- for (int i = numElementsFirst; i < numElements; i++) {
- synchronized (lock) {
- op.processElement(new StreamRecord<Integer>(i));
- }
- Thread.sleep(1);
- }
-
- op.dispose();
-
- // re-create the operator and restore the state
- final CollectingOutput<Integer> out2 = new CollectingOutput<>(windowSlide);
- op = new AggregatingProcessingTimeWindowOperator<>(
- sumFunction, identitySelector,
- IntSerializer.INSTANCE, IntSerializer.INSTANCE,
- windowSize, windowSlide);
-
- op.setup(mockTask, new StreamConfig(new Configuration()), out2);
- op.restoreState(state);
- op.open();
-
-
- // inject again the remaining elements
- for (int i = numElementsFirst; i < numElements; i++) {
- synchronized (lock) {
- op.processElement(new StreamRecord<Integer>(i));
- }
- Thread.sleep(1);
- }
-
- // for a deterministic result, we need to wait until all pending triggers
- // have fired and emitted their results
- long deadline = System.currentTimeMillis() + 120000;
- do {
- Thread.sleep(20);
- }
- while (resultAtSnapshot.size() + out2.getElements().size() < factor * numElements
- && System.currentTimeMillis() < deadline);
-
- synchronized (lock) {
- op.close();
- }
- op.dispose();
-
- // get and verify the result
- List<Integer> finalResult = new ArrayList<>(resultAtSnapshot);
- finalResult.addAll(out2.getElements());
- assertEquals(factor * numElements, finalResult.size());
-
- Collections.sort(finalResult);
- for (int i = 0; i < factor * numElements; i++) {
- assertEquals(i / factor, finalResult.get(i).intValue());
- }
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- finally {
- timerService.shutdown();
- }
- }
-
- // ------------------------------------------------------------------------
-
- private void assertInvalidParameter(long windowSize, long windowSlide) {
- try {
- new AggregatingProcessingTimeWindowOperator<String, String>(
- mockFunction, mockKeySelector,
- StringSerializer.INSTANCE, StringSerializer.INSTANCE,
- windowSize, windowSlide);
- fail("This should fail with an IllegalArgumentException");
- }
- catch (IllegalArgumentException e) {
- // expected
- }
- catch (Exception e) {
- fail("Wrong exception. Expected IllegalArgumentException but found " + e.getClass().getSimpleName());
- }
- }
-
- // ------------------------------------------------------------------------
-
- private static class FailingFunction implements ReduceFunction<Integer> {
-
- private final int failAfterElements;
-
- private int numElements;
-
- FailingFunction(int failAfterElements) {
- this.failAfterElements = failAfterElements;
- }
-
- @Override
- public Integer reduce(Integer value1, Integer value2) throws Exception {
- numElements++;
-
- if (numElements >= failAfterElements) {
- throw new Exception("Artificial Test Exception");
- }
-
- return value1 + value2;
- }
- }
-
- private static StreamTask<?, ?> createMockTask() {
- StreamTask<?, ?> task = mock(StreamTask.class);
- when(task.getAccumulatorMap()).thenReturn(new HashMap<String, Accumulator<?, ?>>());
- when(task.getName()).thenReturn("Test task name");
- when(task.getExecutionConfig()).thenReturn(new ExecutionConfig());
-
- Environment env = mock(Environment.class);
- when(env.getIndexInSubtaskGroup()).thenReturn(0);
- when(env.getNumberOfSubtasks()).thenReturn(1);
- when(env.getUserClassLoader()).thenReturn(AggregatingAlignedProcessingTimeWindowOperatorTest.class.getClassLoader());
-
- when(task.getEnvironment()).thenReturn(env);
-
- // ugly java generic hacks to get the state backend into the mock
- @SuppressWarnings("unchecked")
- OngoingStubbing<StateBackend<?>> stubbing =
- (OngoingStubbing<StateBackend<?>>) (OngoingStubbing<?>) when(task.getStateBackend());
- stubbing.thenReturn(MemoryStateBackend.defaultInstance());
-
- return task;
- }
-
- private static StreamTask<?, ?> createMockTaskWithTimer(
- final ScheduledExecutorService timerService, final Object lock)
- {
- StreamTask<?, ?> mockTask = createMockTask();
-
- doAnswer(new Answer<Void>() {
- @Override
- public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
- final Long timestamp = (Long) invocationOnMock.getArguments()[0];
- final Triggerable target = (Triggerable) invocationOnMock.getArguments()[1];
- timerService.schedule(
- new Callable<Object>() {
- @Override
- public Object call() throws Exception {
- synchronized (lock) {
- target.trigger(timestamp);
- }
- return null;
- }
- },
- timestamp - System.currentTimeMillis(),
- TimeUnit.MILLISECONDS);
- return null;
- }
- }).when(mockTask).registerTimer(anyLong(), any(Triggerable.class));
-
- return mockTask;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
deleted file mode 100644
index 282c71f..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
+++ /dev/null
@@ -1,215 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.runtime.operators.windowing;
-
-import org.apache.flink.api.common.functions.RichReduceFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.transformations.OneInputTransformation;
-import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows;
-import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows;
-import org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
-import org.apache.flink.streaming.api.windowing.evictors.TimeEvictor;
-import org.apache.flink.streaming.api.windowing.time.Time;
-import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
-import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-import org.apache.flink.streaming.runtime.operators.windowing.buffers.HeapWindowBuffer;
-import org.apache.flink.streaming.runtime.operators.windowing.buffers.PreAggregatingHeapWindowBuffer;
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
-import org.apache.flink.util.Collector;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.concurrent.TimeUnit;
-
-/**
- * These tests verify that the api calls on
- * {@link org.apache.flink.streaming.api.datastream.AllWindowedStream} instantiate
- * the correct window operator.
- */
-public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase {
-
- /**
- * These tests ensure that the correct trigger is set when using event-time windows.
- */
- @Test
- @SuppressWarnings("rawtypes")
- public void testEventTime() throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
- env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
-
- DummyReducer reducer = new DummyReducer();
-
- DataStream<Tuple2<String, Integer>> window1 = source
- .windowAll(SlidingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
- .reduce(reducer);
-
- OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
- OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator();
- Assert.assertTrue(operator1 instanceof NonKeyedWindowOperator);
- NonKeyedWindowOperator winOperator1 = (NonKeyedWindowOperator) operator1;
- Assert.assertFalse(winOperator1.isSetProcessingTime());
- Assert.assertTrue(winOperator1.getTrigger() instanceof EventTimeTrigger);
- Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingTimeWindows);
- Assert.assertTrue(winOperator1.getWindowBufferFactory() instanceof PreAggregatingHeapWindowBuffer.Factory);
-
- DataStream<Tuple2<String, Integer>> window2 = source
- .windowAll(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
- .apply(new AllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void apply(
- TimeWindow window,
- Iterable<Tuple2<String, Integer>> values,
- Collector<Tuple2<String, Integer>> out) throws Exception {
-
- }
- });
-
- OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform2 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window2.getTransformation();
- OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator2 = transform2.getOperator();
- Assert.assertTrue(operator2 instanceof NonKeyedWindowOperator);
- NonKeyedWindowOperator winOperator2 = (NonKeyedWindowOperator) operator2;
- Assert.assertFalse(winOperator2.isSetProcessingTime());
- Assert.assertTrue(winOperator2.getTrigger() instanceof EventTimeTrigger);
- Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingTimeWindows);
- Assert.assertTrue(winOperator2.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory);
- }
-
- @Test
- @SuppressWarnings("rawtypes")
- public void testNonEvicting() throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
-
- DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
-
- DummyReducer reducer = new DummyReducer();
-
- DataStream<Tuple2<String, Integer>> window1 = source
- .windowAll(SlidingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
- .trigger(CountTrigger.of(100))
- .reduce(reducer);
-
- OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
- OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator();
- Assert.assertTrue(operator1 instanceof NonKeyedWindowOperator);
- NonKeyedWindowOperator winOperator1 = (NonKeyedWindowOperator) operator1;
- Assert.assertTrue(winOperator1.isSetProcessingTime());
- Assert.assertTrue(winOperator1.getTrigger() instanceof CountTrigger);
- Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingTimeWindows);
- Assert.assertTrue(winOperator1.getWindowBufferFactory() instanceof PreAggregatingHeapWindowBuffer.Factory);
-
- DataStream<Tuple2<String, Integer>> window2 = source
- .windowAll(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
- .trigger(CountTrigger.of(100))
- .apply(new AllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void apply(
- TimeWindow window,
- Iterable<Tuple2<String, Integer>> values,
- Collector<Tuple2<String, Integer>> out) throws Exception {
-
- }
- });
-
- OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform2 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window2.getTransformation();
- OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator2 = transform2.getOperator();
- Assert.assertTrue(operator2 instanceof NonKeyedWindowOperator);
- NonKeyedWindowOperator winOperator2 = (NonKeyedWindowOperator) operator2;
- Assert.assertTrue(winOperator1.isSetProcessingTime());
- Assert.assertTrue(winOperator2.getTrigger() instanceof CountTrigger);
- Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingTimeWindows);
- Assert.assertTrue(winOperator2.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory);
- }
-
- @Test
- @SuppressWarnings("rawtypes")
- public void testEvicting() throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
-
- DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
-
- DummyReducer reducer = new DummyReducer();
-
- DataStream<Tuple2<String, Integer>> window1 = source
- .windowAll(SlidingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
- .evictor(CountEvictor.of(100))
- .reduce(reducer);
-
- OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
- OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator();
- Assert.assertTrue(operator1 instanceof EvictingNonKeyedWindowOperator);
- EvictingNonKeyedWindowOperator winOperator1 = (EvictingNonKeyedWindowOperator) operator1;
- Assert.assertFalse(winOperator1.isSetProcessingTime());
- Assert.assertTrue(winOperator1.getTrigger() instanceof EventTimeTrigger);
- Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingTimeWindows);
- Assert.assertTrue(winOperator1.getEvictor() instanceof CountEvictor);
- Assert.assertTrue(winOperator1.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory);
-
- DataStream<Tuple2<String, Integer>> window2 = source
- .windowAll(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
- .trigger(CountTrigger.of(100))
- .evictor(TimeEvictor.of(Time.of(100, TimeUnit.MILLISECONDS)))
- .apply(new AllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void apply(
- TimeWindow window,
- Iterable<Tuple2<String, Integer>> values,
- Collector<Tuple2<String, Integer>> out) throws Exception {
-
- }
- });
-
- OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform2 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window2.getTransformation();
- OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator2 = transform2.getOperator();
- Assert.assertTrue(operator2 instanceof EvictingNonKeyedWindowOperator);
- EvictingNonKeyedWindowOperator winOperator2 = (EvictingNonKeyedWindowOperator) operator2;
- Assert.assertFalse(winOperator2.isSetProcessingTime());
- Assert.assertTrue(winOperator2.getTrigger() instanceof CountTrigger);
- Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingTimeWindows);
- Assert.assertTrue(winOperator2.getEvictor() instanceof TimeEvictor);
- Assert.assertTrue(winOperator2.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory);
- }
-
- // ------------------------------------------------------------------------
- // UDFs
- // ------------------------------------------------------------------------
-
- public static class DummyReducer extends RichReduceFunction<Tuple2<String, Integer>> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
- return value1;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CoGroupJoinITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CoGroupJoinITCase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CoGroupJoinITCase.java
deleted file mode 100644
index cfae026..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CoGroupJoinITCase.java
+++ /dev/null
@@ -1,373 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements. See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.flink.streaming.runtime.operators.windowing;
-
-import com.google.common.collect.Lists;
-import org.apache.flink.api.common.functions.CoGroupFunction;
-import org.apache.flink.api.common.functions.JoinFunction;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.TimestampExtractor;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows;
-import org.apache.flink.streaming.api.windowing.time.Time;
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
-import org.apache.flink.util.Collector;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-public class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase {
-
- private static List<String> testResults;
-
- @Test
- public void testCoGroup() throws Exception {
-
- testResults = Lists.newArrayList();
-
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
- env.setParallelism(1);
-
- DataStream<Tuple2<String, Integer>> source1 = env.addSource(new SourceFunction<Tuple2<String, Integer>>() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {
- ctx.collect(Tuple2.of("a", 0));
- ctx.collect(Tuple2.of("a", 1));
- ctx.collect(Tuple2.of("a", 2));
-
- ctx.collect(Tuple2.of("b", 3));
- ctx.collect(Tuple2.of("b", 4));
- ctx.collect(Tuple2.of("b", 5));
-
- ctx.collect(Tuple2.of("a", 6));
- ctx.collect(Tuple2.of("a", 7));
- ctx.collect(Tuple2.of("a", 8));
- }
-
- @Override
- public void cancel() {
- }
- }).assignTimestamps(new Tuple2TimestampExtractor());
-
- DataStream<Tuple2<String, Integer>> source2 = env.addSource(new SourceFunction<Tuple2<String, Integer>>() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {
- ctx.collect(Tuple2.of("a", 0));
- ctx.collect(Tuple2.of("a", 1));
-
- ctx.collect(Tuple2.of("b", 3));
-
- ctx.collect(Tuple2.of("c", 6));
- ctx.collect(Tuple2.of("c", 7));
- ctx.collect(Tuple2.of("c", 8));
- }
-
- @Override
- public void cancel() {
- }
- }).assignTimestamps(new Tuple2TimestampExtractor());
-
-
- source1.coGroup(source2)
- .where(new Tuple2KeyExtractor())
- .equalTo(new Tuple2KeyExtractor())
- .window(TumblingTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
- .apply(new CoGroupFunction<Tuple2<String,Integer>, Tuple2<String,Integer>, String>() {
- @Override
- public void coGroup(Iterable<Tuple2<String, Integer>> first,
- Iterable<Tuple2<String, Integer>> second,
- Collector<String> out) throws Exception {
- StringBuilder result = new StringBuilder();
- result.append("F:");
- for (Tuple2<String, Integer> t: first) {
- result.append(t.toString());
- }
- result.append(" S:");
- for (Tuple2<String, Integer> t: second) {
- result.append(t.toString());
- }
- out.collect(result.toString());
- }
- })
- .addSink(new SinkFunction<String>() {
- @Override
- public void invoke(String value) throws Exception {
- testResults.add(value);
- }
- });
-
- env.execute("CoGroup Test");
-
- List<String> expectedResult = Lists.newArrayList(
- "F:(a,0)(a,1)(a,2) S:(a,0)(a,1)",
- "F:(b,3)(b,4)(b,5) S:(b,3)",
- "F:(a,6)(a,7)(a,8) S:",
- "F: S:(c,6)(c,7)(c,8)");
-
- Collections.sort(expectedResult);
- Collections.sort(testResults);
-
- Assert.assertEquals(expectedResult, testResults);
- }
-
- @Test
- public void testJoin() throws Exception {
-
- testResults = Lists.newArrayList();
-
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
- env.setParallelism(1);
-
- DataStream<Tuple3<String, String, Integer>> source1 = env.addSource(new SourceFunction<Tuple3<String, String, Integer>>() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void run(SourceContext<Tuple3<String, String, Integer>> ctx) throws Exception {
- ctx.collect(Tuple3.of("a", "x", 0));
- ctx.collect(Tuple3.of("a", "y", 1));
- ctx.collect(Tuple3.of("a", "z", 2));
-
- ctx.collect(Tuple3.of("b", "u", 3));
- ctx.collect(Tuple3.of("b", "w", 5));
-
- ctx.collect(Tuple3.of("a", "i", 6));
- ctx.collect(Tuple3.of("a", "j", 7));
- ctx.collect(Tuple3.of("a", "k", 8));
- }
-
- @Override
- public void cancel() {
- }
- }).assignTimestamps(new Tuple3TimestampExtractor());
-
- DataStream<Tuple3<String, String, Integer>> source2 = env.addSource(new SourceFunction<Tuple3<String, String, Integer>>() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void run(SourceContext<Tuple3<String, String, Integer>> ctx) throws Exception {
- ctx.collect(Tuple3.of("a", "u", 0));
- ctx.collect(Tuple3.of("a", "w", 1));
-
- ctx.collect(Tuple3.of("b", "i", 3));
- ctx.collect(Tuple3.of("b", "k", 5));
-
- ctx.collect(Tuple3.of("a", "x", 6));
- ctx.collect(Tuple3.of("a", "z", 8));
- }
-
- @Override
- public void cancel() {
- }
- }).assignTimestamps(new Tuple3TimestampExtractor());
-
-
- source1.join(source2)
- .where(new Tuple3KeyExtractor())
- .equalTo(new Tuple3KeyExtractor())
- .window(TumblingTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
- .apply(new JoinFunction<Tuple3<String, String, Integer>, Tuple3<String, String, Integer>, String>() {
- @Override
- public String join(Tuple3<String, String, Integer> first, Tuple3<String, String, Integer> second) throws Exception {
- return first + ":" + second;
- }
- })
- .addSink(new SinkFunction<String>() {
- @Override
- public void invoke(String value) throws Exception {
- testResults.add(value);
- }
- });
-
- env.execute("Join Test");
-
- List<String> expectedResult = Lists.newArrayList(
- "(a,x,0):(a,u,0)",
- "(a,x,0):(a,w,1)",
- "(a,y,1):(a,u,0)",
- "(a,y,1):(a,w,1)",
- "(a,z,2):(a,u,0)",
- "(a,z,2):(a,w,1)",
- "(b,u,3):(b,i,3)",
- "(b,u,3):(b,k,5)",
- "(b,w,5):(b,i,3)",
- "(b,w,5):(b,k,5)",
- "(a,i,6):(a,x,6)",
- "(a,i,6):(a,z,8)",
- "(a,j,7):(a,x,6)",
- "(a,j,7):(a,z,8)",
- "(a,k,8):(a,x,6)",
- "(a,k,8):(a,z,8)");
-
- Collections.sort(expectedResult);
- Collections.sort(testResults);
-
- Assert.assertEquals(expectedResult, testResults);
- }
-
- @Test
- public void testSelfJoin() throws Exception {
-
- testResults = Lists.newArrayList();
-
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
- env.setParallelism(1);
-
- DataStream<Tuple3<String, String, Integer>> source1 = env.addSource(new SourceFunction<Tuple3<String, String, Integer>>() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void run(SourceContext<Tuple3<String, String, Integer>> ctx) throws Exception {
- ctx.collect(Tuple3.of("a", "x", 0));
- ctx.collect(Tuple3.of("a", "y", 1));
- ctx.collect(Tuple3.of("a", "z", 2));
-
- ctx.collect(Tuple3.of("b", "u", 3));
- ctx.collect(Tuple3.of("b", "w", 5));
-
- ctx.collect(Tuple3.of("a", "i", 6));
- ctx.collect(Tuple3.of("a", "j", 7));
- ctx.collect(Tuple3.of("a", "k", 8));
- }
-
- @Override
- public void cancel() {
- }
- }).assignTimestamps(new Tuple3TimestampExtractor());
-
- source1.join(source1)
- .where(new Tuple3KeyExtractor())
- .equalTo(new Tuple3KeyExtractor())
- .window(TumblingTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
- .apply(new JoinFunction<Tuple3<String, String, Integer>, Tuple3<String, String, Integer>, String>() {
- @Override
- public String join(Tuple3<String, String, Integer> first, Tuple3<String, String, Integer> second) throws Exception {
- return first + ":" + second;
- }
- })
- .addSink(new SinkFunction<String>() {
- @Override
- public void invoke(String value) throws Exception {
- testResults.add(value);
- }
- });
-
- env.execute("Self-Join Test");
-
- List<String> expectedResult = Lists.newArrayList(
- "(a,x,0):(a,x,0)",
- "(a,x,0):(a,y,1)",
- "(a,x,0):(a,z,2)",
- "(a,y,1):(a,x,0)",
- "(a,y,1):(a,y,1)",
- "(a,y,1):(a,z,2)",
- "(a,z,2):(a,x,0)",
- "(a,z,2):(a,y,1)",
- "(a,z,2):(a,z,2)",
- "(b,u,3):(b,u,3)",
- "(b,u,3):(b,w,5)",
- "(b,w,5):(b,u,3)",
- "(b,w,5):(b,w,5)",
- "(a,i,6):(a,i,6)",
- "(a,i,6):(a,j,7)",
- "(a,i,6):(a,k,8)",
- "(a,j,7):(a,i,6)",
- "(a,j,7):(a,j,7)",
- "(a,j,7):(a,k,8)",
- "(a,k,8):(a,i,6)",
- "(a,k,8):(a,j,7)",
- "(a,k,8):(a,k,8)");
-
- Collections.sort(expectedResult);
- Collections.sort(testResults);
-
- Assert.assertEquals(expectedResult, testResults);
- }
-
- private static class Tuple2TimestampExtractor implements TimestampExtractor<Tuple2<String, Integer>> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public long extractTimestamp(Tuple2<String, Integer> element, long currentTimestamp) {
- return element.f1;
- }
-
- @Override
- public long extractWatermark(Tuple2<String, Integer> element, long currentTimestamp) {
- return element.f1 - 1;
- }
-
- @Override
- public long getCurrentWatermark() {
- return Long.MIN_VALUE;
- }
- }
-
- private static class Tuple3TimestampExtractor implements TimestampExtractor<Tuple3<String, String, Integer>> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public long extractTimestamp(Tuple3<String, String, Integer> element, long currentTimestamp) {
- return element.f2;
- }
-
- @Override
- public long extractWatermark(Tuple3<String, String, Integer> element, long currentTimestamp) {
- return element.f2 - 1;
- }
-
- @Override
- public long getCurrentWatermark() {
- return Long.MIN_VALUE;
- }
- }
-
- private static class Tuple2KeyExtractor implements KeySelector<Tuple2<String,Integer>, String> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public String getKey(Tuple2<String, Integer> value) throws Exception {
- return value.f0;
- }
- }
-
- private static class Tuple3KeyExtractor implements KeySelector<Tuple3<String, String, Integer>, String> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public String getKey(Tuple3<String, String, Integer> value) throws Exception {
- return value.f0;
- }
- }
-
-}