You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/09/28 18:14:58 UTC
[02/12] flink git commit: Move window operators and tests to
windowing package
http://git-wip-us.apache.org/repos/asf/flink/blob/05d2138f/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windows/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windows/AccumulatingAlignedProcessingTimeWindowOperatorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windows/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
deleted file mode 100644
index 19801f1..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windows/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
+++ /dev/null
@@ -1,547 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.operators.windows;
-
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.windows.KeyedWindowFunction;
-import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.streaming.runtime.operators.Triggerable;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.StreamTask;
-import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
-
-import org.apache.flink.util.Collector;
-import org.junit.After;
-import org.junit.Test;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import static org.mockito.Mockito.*;
-import static org.junit.Assert.*;
-
-@SuppressWarnings("serial")
-public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
-
- @SuppressWarnings("unchecked")
- private final KeyedWindowFunction<String, String, String> mockFunction = mock(KeyedWindowFunction.class);
-
- @SuppressWarnings("unchecked")
- private final KeySelector<String, String> mockKeySelector = mock(KeySelector.class);
-
- private final KeySelector<Integer, Integer> identitySelector = new KeySelector<Integer, Integer>() {
- @Override
- public Integer getKey(Integer value) {
- return value;
- }
- };
-
- private final KeyedWindowFunction<Integer, Integer, Integer> validatingIdentityFunction =
- new KeyedWindowFunction<Integer, Integer, Integer>()
- {
- @Override
- public void evaluate(Integer key, Iterable<Integer> values, Collector<Integer> out) {
- for (Integer val : values) {
- assertEquals(key, val);
- out.collect(val);
- }
- }
- };
-
- // ------------------------------------------------------------------------
-
- @After
- public void checkNoTriggerThreadsRunning() {
- // make sure that all the threads we trigger are shut down
- long deadline = System.currentTimeMillis() + 5000;
- while (StreamTask.TRIGGER_THREAD_GROUP.activeCount() > 0 && System.currentTimeMillis() < deadline) {
- try {
- Thread.sleep(10);
- }
- catch (InterruptedException ignored) {}
- }
-
- assertTrue("Not all trigger threads where properly shut down",
- StreamTask.TRIGGER_THREAD_GROUP.activeCount() == 0);
- }
-
- // ------------------------------------------------------------------------
-
- @Test
- public void testInvalidParameters() {
- try {
- assertInvalidParameter(-1L, -1L);
- assertInvalidParameter(10000L, -1L);
- assertInvalidParameter(-1L, 1000L);
- assertInvalidParameter(1000L, 2000L);
-
- // actual internal slide is too low here:
- assertInvalidParameter(1000L, 999L);
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testWindowSizeAndSlide() {
- try {
- AbstractAlignedProcessingTimeWindowOperator<String, String, String> op;
-
- op = new AccumulatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, 5000, 1000);
- assertEquals(5000, op.getWindowSize());
- assertEquals(1000, op.getWindowSlide());
- assertEquals(1000, op.getPaneSize());
- assertEquals(5, op.getNumPanesPerWindow());
-
- op = new AccumulatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, 1000, 1000);
- assertEquals(1000, op.getWindowSize());
- assertEquals(1000, op.getWindowSlide());
- assertEquals(1000, op.getPaneSize());
- assertEquals(1, op.getNumPanesPerWindow());
-
- op = new AccumulatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, 1500, 1000);
- assertEquals(1500, op.getWindowSize());
- assertEquals(1000, op.getWindowSlide());
- assertEquals(500, op.getPaneSize());
- assertEquals(3, op.getNumPanesPerWindow());
-
- op = new AccumulatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, 1200, 1100);
- assertEquals(1200, op.getWindowSize());
- assertEquals(1100, op.getWindowSlide());
- assertEquals(100, op.getPaneSize());
- assertEquals(12, op.getNumPanesPerWindow());
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testWindowTriggerTimeAlignment() {
- try {
- @SuppressWarnings("unchecked")
- final Output<StreamRecord<String>> mockOut = mock(Output.class);
-
- final StreamingRuntimeContext mockContext = mock(StreamingRuntimeContext.class);
- when(mockContext.getTaskName()).thenReturn("Test task name");
-
- AbstractAlignedProcessingTimeWindowOperator<String, String, String> op;
-
- op = new AccumulatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, 5000, 1000);
- op.setup(mockOut, mockContext);
- op.open(new Configuration());
- assertTrue(op.getNextSlideTime() % 1000 == 0);
- assertTrue(op.getNextEvaluationTime() % 1000 == 0);
- op.dispose();
-
- op = new AccumulatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, 1000, 1000);
- op.setup(mockOut, mockContext);
- op.open(new Configuration());
- assertTrue(op.getNextSlideTime() % 1000 == 0);
- assertTrue(op.getNextEvaluationTime() % 1000 == 0);
- op.dispose();
-
- op = new AccumulatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, 1500, 1000);
- op.setup(mockOut, mockContext);
- op.open(new Configuration());
- assertTrue(op.getNextSlideTime() % 500 == 0);
- assertTrue(op.getNextEvaluationTime() % 1000 == 0);
- op.dispose();
-
- op = new AccumulatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, 1200, 1100);
- op.setup(mockOut, mockContext);
- op.open(new Configuration());
- assertTrue(op.getNextSlideTime() % 100 == 0);
- assertTrue(op.getNextEvaluationTime() % 1100 == 0);
- op.dispose();
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testTumblingWindow() {
- try {
- final int windowSize = 50;
- final CollectingOutput<Integer> out = new CollectingOutput<>(windowSize);
-
- final StreamingRuntimeContext mockContext = mock(StreamingRuntimeContext.class);
- when(mockContext.getTaskName()).thenReturn("Test task name");
-
- // tumbling window that triggers every 20 milliseconds
- AbstractAlignedProcessingTimeWindowOperator<Integer, Integer, Integer> op =
- new AccumulatingProcessingTimeWindowOperator<>(
- validatingIdentityFunction, identitySelector, windowSize, windowSize);
-
- op.setup(out, mockContext);
- op.open(new Configuration());
-
- final int numElements = 1000;
-
- for (int i = 0; i < numElements; i++) {
- op.processElement(new StreamRecord<Integer>(i));
- Thread.sleep(1);
- }
-
- op.close();
- op.dispose();
-
- // get and verify the result
- List<Integer> result = out.getElements();
- assertEquals(numElements, result.size());
-
- Collections.sort(result);
- for (int i = 0; i < numElements; i++) {
- assertEquals(i, result.get(i).intValue());
- }
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testSlidingWindow() {
- try {
- final CollectingOutput<Integer> out = new CollectingOutput<>(50);
-
- final StreamingRuntimeContext mockContext = mock(StreamingRuntimeContext.class);
- when(mockContext.getTaskName()).thenReturn("Test task name");
-
- // tumbling window that triggers every 20 milliseconds
- AbstractAlignedProcessingTimeWindowOperator<Integer, Integer, Integer> op =
- new AccumulatingProcessingTimeWindowOperator<>(validatingIdentityFunction, identitySelector, 150, 50);
-
- op.setup(out, mockContext);
- op.open(new Configuration());
-
- final int numElements = 1000;
-
- for (int i = 0; i < numElements; i++) {
- op.processElement(new StreamRecord<Integer>(i));
- Thread.sleep(1);
- }
-
- op.close();
- op.dispose();
-
- // get and verify the result
- List<Integer> result = out.getElements();
-
- // if we kept this running, each element would be in the result three times (for each slide).
- // we are closing the window before the final panes are through three times, so we may have less
- // elements.
- if (result.size() < numElements || result.size() > 3 * numElements) {
- fail("Wrong number of results: " + result.size());
- }
-
- Collections.sort(result);
- int lastNum = -1;
- int lastCount = -1;
-
- for (int num : result) {
- if (num == lastNum) {
- lastCount++;
- assertTrue(lastCount <= 3);
- }
- else {
- lastNum = num;
- lastCount = 1;
- }
- }
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testTumblingWindowSingleElements() {
- final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
-
- try {
- final CollectingOutput<Integer> out = new CollectingOutput<>(50);
-
- final StreamingRuntimeContext mockContext = mock(StreamingRuntimeContext.class);
- when(mockContext.getTaskName()).thenReturn("Test task name");
-
- final Object lock = new Object();
-
- doAnswer(new Answer() {
- @Override
- public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
- final Long timestamp = (Long) invocationOnMock.getArguments()[0];
- final Triggerable target = (Triggerable) invocationOnMock.getArguments()[1];
- timerService.schedule(
- new Callable<Object>() {
- @Override
- public Object call() throws Exception {
- synchronized (lock) {
- target.trigger(timestamp);
- }
- return null;
- }
- },
- timestamp - System.currentTimeMillis(),
- TimeUnit.MILLISECONDS);
- return null;
- }
- }).when(mockContext).registerTimer(anyLong(), any(Triggerable.class));
-
- // tumbling window that triggers every 20 milliseconds
- AbstractAlignedProcessingTimeWindowOperator<Integer, Integer, Integer> op =
- new AccumulatingProcessingTimeWindowOperator<>(validatingIdentityFunction, identitySelector, 50, 50);
-
- op.setup(out, mockContext);
- op.open(new Configuration());
-
- synchronized (lock) {
- op.processElement(new StreamRecord<Integer>(1));
- op.processElement(new StreamRecord<Integer>(2));
- }
- out.waitForNElements(2, 60000);
-
- synchronized (lock) {
- op.processElement(new StreamRecord<Integer>(3));
- op.processElement(new StreamRecord<Integer>(4));
- op.processElement(new StreamRecord<Integer>(5));
- }
- out.waitForNElements(5, 60000);
-
- synchronized (lock) {
- op.processElement(new StreamRecord<Integer>(6));
- }
- out.waitForNElements(6, 60000);
-
- List<Integer> result = out.getElements();
- assertEquals(6, result.size());
-
- Collections.sort(result);
- assertEquals(Arrays.asList(1, 2, 3, 4, 5, 6), result);
-
- op.close();
- op.dispose();
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- } finally {
- timerService.shutdown();
- }
- }
-
- @Test
- public void testSlidingWindowSingleElements() {
- final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
-
- try {
- final CollectingOutput<Integer> out = new CollectingOutput<>(50);
-
- final StreamingRuntimeContext mockContext = mock(StreamingRuntimeContext.class);
- when(mockContext.getTaskName()).thenReturn("Test task name");
-
- final Object lock = new Object();
-
- doAnswer(new Answer() {
- @Override
- public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
- final Long timestamp = (Long) invocationOnMock.getArguments()[0];
- final Triggerable target = (Triggerable) invocationOnMock.getArguments()[1];
- timerService.schedule(
- new Callable<Object>() {
- @Override
- public Object call() throws Exception {
- synchronized (lock) {
- target.trigger(timestamp);
- }
- return null;
- }
- },
- timestamp - System.currentTimeMillis(),
- TimeUnit.MILLISECONDS);
- return null;
- }
- }).when(mockContext).registerTimer(anyLong(), any(Triggerable.class));
-
- // tumbling window that triggers every 20 milliseconds
- AbstractAlignedProcessingTimeWindowOperator<Integer, Integer, Integer> op =
- new AccumulatingProcessingTimeWindowOperator<>(validatingIdentityFunction, identitySelector, 150, 50);
-
- op.setup(out, mockContext);
- op.open(new Configuration());
-
- synchronized (lock) {
- op.processElement(new StreamRecord<Integer>(1));
- op.processElement(new StreamRecord<Integer>(2));
- }
-
- // each element should end up in the output three times
- // wait until the elements have arrived 6 times in the output
- out.waitForNElements(6, 120000);
-
- List<Integer> result = out.getElements();
- assertEquals(6, result.size());
-
- Collections.sort(result);
- assertEquals(Arrays.asList(1, 1, 1, 2, 2, 2), result);
-
- op.close();
- op.dispose();
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- } finally {
- timerService.shutdown();
- }
- }
-
- @Test
- public void testEmitTrailingDataOnClose() {
- try {
- final CollectingOutput<Integer> out = new CollectingOutput<>();
-
- final StreamingRuntimeContext mockContext = mock(StreamingRuntimeContext.class);
- when(mockContext.getTaskName()).thenReturn("Test task name");
-
- // the operator has a window time that is so long that it will not fire in this test
- final long oneYear = 365L * 24 * 60 * 60 * 1000;
- AbstractAlignedProcessingTimeWindowOperator<Integer, Integer, Integer> op =
- new AccumulatingProcessingTimeWindowOperator<>(validatingIdentityFunction, identitySelector,
- oneYear, oneYear);
-
- op.setup(out, mockContext);
- op.open(new Configuration());
-
- List<Integer> data = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
- for (Integer i : data) {
- op.processElement(new StreamRecord<Integer>(i));
- }
-
- op.close();
- op.dispose();
-
- // get and verify the result
- List<Integer> result = out.getElements();
- Collections.sort(result);
- assertEquals(data, result);
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testPropagateExceptionsFromClose() {
- try {
- final CollectingOutput<Integer> out = new CollectingOutput<>();
-
- final StreamingRuntimeContext mockContext = mock(StreamingRuntimeContext.class);
- when(mockContext.getTaskName()).thenReturn("Test task name");
-
- KeyedWindowFunction<Integer, Integer, Integer> failingFunction = new FailingFunction(100);
-
- // the operator has a window time that is so long that it will not fire in this test
- final long hundredYears = 100L * 365 * 24 * 60 * 60 * 1000;
- AbstractAlignedProcessingTimeWindowOperator<Integer, Integer, Integer> op =
- new AccumulatingProcessingTimeWindowOperator<>(
- failingFunction, identitySelector, hundredYears, hundredYears);
-
- op.setup(out, mockContext);
- op.open(new Configuration());
-
- for (int i = 0; i < 150; i++) {
- op.processElement(new StreamRecord<Integer>(i));
- }
-
- try {
- op.close();
- fail("This should fail with an exception");
- }
- catch (Exception e) {
- assertTrue(
- e.getMessage().contains("Artificial Test Exception") ||
- (e.getCause() != null && e.getCause().getMessage().contains("Artificial Test Exception")));
- }
-
- op.dispose();
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- // ------------------------------------------------------------------------
-
- private void assertInvalidParameter(long windowSize, long windowSlide) {
- try {
- new AccumulatingProcessingTimeWindowOperator<String, String, String>(
- mockFunction, mockKeySelector, windowSize, windowSlide);
- fail("This should fail with an IllegalArgumentException");
- }
- catch (IllegalArgumentException e) {
- // expected
- }
- catch (Exception e) {
- fail("Wrong exception. Expected IllegalArgumentException but found " + e.getClass().getSimpleName());
- }
- }
-
- // ------------------------------------------------------------------------
-
- private static class FailingFunction implements KeyedWindowFunction<Integer, Integer, Integer> {
-
- private final int failAfterElements;
-
- private int numElements;
-
- FailingFunction(int failAfterElements) {
- this.failAfterElements = failAfterElements;
- }
-
- @Override
- public void evaluate(Integer integer, Iterable<Integer> values, Collector<Integer> out) throws Exception {
- for (Integer i : values) {
- out.collect(i);
- numElements++;
-
- if (numElements >= failAfterElements) {
- throw new Exception("Artificial Test Exception");
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/05d2138f/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windows/AggregatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windows/AggregatingAlignedProcessingTimeWindowOperatorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windows/AggregatingAlignedProcessingTimeWindowOperatorTest.java
deleted file mode 100644
index 0ff974c..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windows/AggregatingAlignedProcessingTimeWindowOperatorTest.java
+++ /dev/null
@@ -1,551 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.operators.windows;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.streaming.runtime.operators.Triggerable;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.StreamTask;
-import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
-
-import org.junit.After;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyLong;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-@SuppressWarnings("serial")
-public class AggregatingAlignedProcessingTimeWindowOperatorTest {
-
- @SuppressWarnings("unchecked")
- private final ReduceFunction<String> mockFunction = mock(ReduceFunction.class);
-
- @SuppressWarnings("unchecked")
- private final KeySelector<String, String> mockKeySelector = mock(KeySelector.class);
-
- private final KeySelector<Integer, Integer> identitySelector = new KeySelector<Integer, Integer>() {
- @Override
- public Integer getKey(Integer value) {
- return value;
- }
- };
-
- private final ReduceFunction<Integer> sumFunction = new ReduceFunction<Integer>() {
- @Override
- public Integer reduce(Integer value1, Integer value2) {
- return value1 + value2;
- }
- };
-
- // ------------------------------------------------------------------------
-
- @After
- public void checkNoTriggerThreadsRunning() {
- // make sure that all the threads we trigger are shut down
- long deadline = System.currentTimeMillis() + 5000;
- while (StreamTask.TRIGGER_THREAD_GROUP.activeCount() > 0 && System.currentTimeMillis() < deadline) {
- try {
- Thread.sleep(10);
- }
- catch (InterruptedException ignored) {}
- }
-
- assertTrue("Not all trigger threads where properly shut down",
- StreamTask.TRIGGER_THREAD_GROUP.activeCount() == 0);
- }
-
- // ------------------------------------------------------------------------
-
- @Test
- public void testInvalidParameters() {
- try {
- assertInvalidParameter(-1L, -1L);
- assertInvalidParameter(10000L, -1L);
- assertInvalidParameter(-1L, 1000L);
- assertInvalidParameter(1000L, 2000L);
-
- // actual internal slide is too low here:
- assertInvalidParameter(1000L, 999L);
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testWindowSizeAndSlide() {
- try {
- AbstractAlignedProcessingTimeWindowOperator<String, String, String> op;
-
- op = new AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, 5000, 1000);
- assertEquals(5000, op.getWindowSize());
- assertEquals(1000, op.getWindowSlide());
- assertEquals(1000, op.getPaneSize());
- assertEquals(5, op.getNumPanesPerWindow());
-
- op = new AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, 1000, 1000);
- assertEquals(1000, op.getWindowSize());
- assertEquals(1000, op.getWindowSlide());
- assertEquals(1000, op.getPaneSize());
- assertEquals(1, op.getNumPanesPerWindow());
-
- op = new AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, 1500, 1000);
- assertEquals(1500, op.getWindowSize());
- assertEquals(1000, op.getWindowSlide());
- assertEquals(500, op.getPaneSize());
- assertEquals(3, op.getNumPanesPerWindow());
-
- op = new AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, 1200, 1100);
- assertEquals(1200, op.getWindowSize());
- assertEquals(1100, op.getWindowSlide());
- assertEquals(100, op.getPaneSize());
- assertEquals(12, op.getNumPanesPerWindow());
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testWindowTriggerTimeAlignment() {
- try {
- @SuppressWarnings("unchecked")
- final Output<StreamRecord<String>> mockOut = mock(Output.class);
-
- final StreamingRuntimeContext mockContext = mock(StreamingRuntimeContext.class);
- when(mockContext.getTaskName()).thenReturn("Test task name");
-
- AbstractAlignedProcessingTimeWindowOperator<String, String, String> op;
-
- op = new AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, 5000, 1000);
- op.setup(mockOut, mockContext);
- op.open(new Configuration());
- assertTrue(op.getNextSlideTime() % 1000 == 0);
- assertTrue(op.getNextEvaluationTime() % 1000 == 0);
- op.dispose();
-
- op = new AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, 1000, 1000);
- op.setup(mockOut, mockContext);
- op.open(new Configuration());
- assertTrue(op.getNextSlideTime() % 1000 == 0);
- assertTrue(op.getNextEvaluationTime() % 1000 == 0);
- op.dispose();
-
- op = new AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, 1500, 1000);
- op.setup(mockOut, mockContext);
- op.open(new Configuration());
- assertTrue(op.getNextSlideTime() % 500 == 0);
- assertTrue(op.getNextEvaluationTime() % 1000 == 0);
- op.dispose();
-
- op = new AggregatingProcessingTimeWindowOperator<>(mockFunction, mockKeySelector, 1200, 1100);
- op.setup(mockOut, mockContext);
- op.open(new Configuration());
- assertTrue(op.getNextSlideTime() % 100 == 0);
- assertTrue(op.getNextEvaluationTime() % 1100 == 0);
- op.dispose();
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testTumblingWindowUniqueElements() {
- try {
- final int windowSize = 50;
- final CollectingOutput<Integer> out = new CollectingOutput<>(windowSize);
-
- final StreamingRuntimeContext mockContext = mock(StreamingRuntimeContext.class);
- when(mockContext.getTaskName()).thenReturn("Test task name");
-
- AggregatingProcessingTimeWindowOperator<Integer, Integer> op =
- new AggregatingProcessingTimeWindowOperator<>(
- sumFunction, identitySelector, windowSize, windowSize);
-
- op.setup(out, mockContext);
- op.open(new Configuration());
-
- final int numElements = 1000;
-
- for (int i = 0; i < numElements; i++) {
- op.processElement(new StreamRecord<Integer>(i));
- Thread.sleep(1);
- }
-
- op.close();
- op.dispose();
-
- // get and verify the result
- List<Integer> result = out.getElements();
- assertEquals(numElements, result.size());
-
- Collections.sort(result);
- for (int i = 0; i < numElements; i++) {
- assertEquals(i, result.get(i).intValue());
- }
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testTumblingWindowDuplicateElements() {
-
- final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
-
- try {
- final int windowSize = 50;
- final CollectingOutput<Integer> out = new CollectingOutput<>(windowSize);
-
- final StreamingRuntimeContext mockContext = mock(StreamingRuntimeContext.class);
- when(mockContext.getTaskName()).thenReturn("Test task name");
-
- final Object lock = new Object();
- doAnswer(new Answer() {
- @Override
- public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
- final Long timestamp = (Long) invocationOnMock.getArguments()[0];
- final Triggerable target = (Triggerable) invocationOnMock.getArguments()[1];
- timerService.schedule(
- new Callable<Object>() {
- @Override
- public Object call() throws Exception {
- synchronized (lock) {
- target.trigger(timestamp);
- }
- return null;
- }
- },
- timestamp - System.currentTimeMillis(),
- TimeUnit.MILLISECONDS);
- return null;
- }
- }).when(mockContext).registerTimer(anyLong(), any(Triggerable.class));
-
- AggregatingProcessingTimeWindowOperator<Integer, Integer> op =
- new AggregatingProcessingTimeWindowOperator<>(
- sumFunction, identitySelector, windowSize, windowSize);
-
- op.setup(out, mockContext);
- op.open(new Configuration());
-
- final int numWindows = 10;
-
- long previousNextTime = 0;
- int window = 1;
-
- while (window <= numWindows) {
- long nextTime = op.getNextEvaluationTime();
- int val = ((int) nextTime) ^ ((int) (nextTime >>> 32));
-
- synchronized (lock) {
- op.processElement(new StreamRecord<Integer>(val));
- }
-
- if (nextTime != previousNextTime) {
- window++;
- previousNextTime = nextTime;
- }
-
- Thread.sleep(1);
- }
-
- op.close();
- op.dispose();
-
- List<Integer> result = out.getElements();
-
- // we have ideally one element per window. we may have more, when we emitted a value into the
- // successive window (corner case), so we can have twice the number of elements, in the worst case.
- assertTrue(result.size() >= numWindows && result.size() <= 2 * numWindows);
-
- // deduplicate for more accurate checks
- HashSet<Integer> set = new HashSet<>(result);
- assertTrue(set.size() == 10 || set.size() == 11);
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- } finally {
- timerService.shutdown();
- }
- }
-
- @Test
- public void testSlidingWindow() {
- try {
- final CollectingOutput<Integer> out = new CollectingOutput<>(50);
-
- final StreamingRuntimeContext mockContext = mock(StreamingRuntimeContext.class);
- when(mockContext.getTaskName()).thenReturn("Test task name");
-
- // tumbling window that triggers every 20 milliseconds
- AggregatingProcessingTimeWindowOperator<Integer, Integer> op =
- new AggregatingProcessingTimeWindowOperator<>(sumFunction, identitySelector, 150, 50);
-
- op.setup(out, mockContext);
- op.open(new Configuration());
-
- final int numElements = 1000;
-
- for (int i = 0; i < numElements; i++) {
- op.processElement(new StreamRecord<Integer>(i));
- Thread.sleep(1);
- }
-
- op.close();
- op.dispose();
-
- // get and verify the result
- List<Integer> result = out.getElements();
-
- // every element can occur between one and three times
- if (result.size() < numElements || result.size() > 3 * numElements) {
- System.out.println(result);
- fail("Wrong number of results: " + result.size());
- }
-
- Collections.sort(result);
- int lastNum = -1;
- int lastCount = -1;
-
- for (int num : result) {
- if (num == lastNum) {
- lastCount++;
- assertTrue(lastCount <= 3);
- }
- else {
- lastNum = num;
- lastCount = 1;
- }
- }
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testSlidingWindowSingleElements() {
- final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
-
- try {
- final CollectingOutput<Integer> out = new CollectingOutput<>(50);
-
- final StreamingRuntimeContext mockContext = mock(StreamingRuntimeContext.class);
- when(mockContext.getTaskName()).thenReturn("Test task name");
-
- final Object lock = new Object();
- doAnswer(new Answer() {
- @Override
- public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
- final Long timestamp = (Long) invocationOnMock.getArguments()[0];
- final Triggerable target = (Triggerable) invocationOnMock.getArguments()[1];
- timerService.schedule(
- new Callable<Object>() {
- @Override
- public Object call() throws Exception {
- synchronized (lock) {
- target.trigger(timestamp);
- }
- return null;
- }
- },
- timestamp - System.currentTimeMillis(),
- TimeUnit.MILLISECONDS);
- return null;
- }
- }).when(mockContext).registerTimer(anyLong(), any(Triggerable.class));
-
- // tumbling window that triggers every 20 milliseconds
- AggregatingProcessingTimeWindowOperator<Integer, Integer> op =
- new AggregatingProcessingTimeWindowOperator<>(sumFunction, identitySelector, 150, 50);
-
- op.setup(out, mockContext);
- op.open(new Configuration());
-
- synchronized (lock) {
- op.processElement(new StreamRecord<Integer>(1));
- op.processElement(new StreamRecord<Integer>(2));
- }
-
- // each element should end up in the output three times
- // wait until the elements have arrived 6 times in the output
- out.waitForNElements(6, 120000);
-
- List<Integer> result = out.getElements();
- assertEquals(6, result.size());
-
- Collections.sort(result);
- assertEquals(Arrays.asList(1, 1, 1, 2, 2, 2), result);
-
- op.close();
- op.dispose();
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- } finally {
- timerService.shutdown();
- }
- }
-
- @Test
- public void testEmitTrailingDataOnClose() {
- try {
- final CollectingOutput<Integer> out = new CollectingOutput<>();
-
- final StreamingRuntimeContext mockContext = mock(StreamingRuntimeContext.class);
- when(mockContext.getTaskName()).thenReturn("Test task name");
-
- // the operator has a window time that is so long that it will not fire in this test
- final long oneYear = 365L * 24 * 60 * 60 * 1000;
- AggregatingProcessingTimeWindowOperator<Integer, Integer> op =
- new AggregatingProcessingTimeWindowOperator<>(sumFunction, identitySelector, oneYear, oneYear);
-
- op.setup(out, mockContext);
- op.open(new Configuration());
-
- List<Integer> data = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
- for (Integer i : data) {
- op.processElement(new StreamRecord<Integer>(i));
- }
-
- op.close();
- op.dispose();
-
- // get and verify the result
- List<Integer> result = out.getElements();
- Collections.sort(result);
- assertEquals(data, result);
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testPropagateExceptionsFromProcessElement() {
- try {
- final CollectingOutput<Integer> out = new CollectingOutput<>();
-
- final StreamingRuntimeContext mockContext = mock(StreamingRuntimeContext.class);
- when(mockContext.getTaskName()).thenReturn("Test task name");
-
- ReduceFunction<Integer> failingFunction = new FailingFunction(100);
-
- // the operator has a window time that is so long that it will not fire in this test
- final long hundredYears = 100L * 365 * 24 * 60 * 60 * 1000;
- AggregatingProcessingTimeWindowOperator<Integer, Integer> op =
- new AggregatingProcessingTimeWindowOperator<>(
- failingFunction, identitySelector, hundredYears, hundredYears);
-
- op.setup(out, mockContext);
- op.open(new Configuration());
-
- for (int i = 0; i < 100; i++) {
- op.processElement(new StreamRecord<Integer>(1));
- }
-
- try {
- op.processElement(new StreamRecord<Integer>(1));
- fail("This fail with an exception");
- }
- catch (Exception e) {
- assertTrue(e.getMessage().contains("Artificial Test Exception"));
- }
-
- op.dispose();
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- // ------------------------------------------------------------------------
-
- private void assertInvalidParameter(long windowSize, long windowSlide) {
- try {
- new AggregatingProcessingTimeWindowOperator<String, String>(
- mockFunction, mockKeySelector, windowSize, windowSlide);
- fail("This should fail with an IllegalArgumentException");
- }
- catch (IllegalArgumentException e) {
- // expected
- }
- catch (Exception e) {
- fail("Wrong exception. Expected IllegalArgumentException but found " + e.getClass().getSimpleName());
- }
- }
-
- // ------------------------------------------------------------------------
-
- private static class FailingFunction implements ReduceFunction<Integer> {
-
- private final int failAfterElements;
-
- private int numElements;
-
- FailingFunction(int failAfterElements) {
- this.failAfterElements = failAfterElements;
- }
-
- @Override
- public Integer reduce(Integer value1, Integer value2) throws Exception {
- numElements++;
-
- if (numElements >= failAfterElements) {
- throw new Exception("Artificial Test Exception");
- }
-
- return value1 + value2;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/05d2138f/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windows/CollectingOutput.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windows/CollectingOutput.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windows/CollectingOutput.java
deleted file mode 100644
index 9f6858d..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windows/CollectingOutput.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.operators.windows;
-
-import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-import java.util.ArrayList;
-import java.util.List;
-
-public class CollectingOutput<T> implements Output<StreamRecord<T>> {
-
- private final List<T> elements = new ArrayList<>();
-
- private final int timeStampModulus;
-
-
- public CollectingOutput() {
- this.timeStampModulus = 0;
- }
-
- public CollectingOutput(int timeStampModulus) {
- this.timeStampModulus = timeStampModulus;
- }
-
- // ------------------------------------------------------------------------
-
- public List<T> getElements() {
- return elements;
- }
-
- public void waitForNElements(int n, long timeout) throws InterruptedException {
- long deadline = System.currentTimeMillis() + timeout;
- synchronized (elements) {
- long now;
- while (elements.size() < n && (now = System.currentTimeMillis()) < deadline) {
- elements.wait(deadline - now);
- }
- }
- }
-
- // ------------------------------------------------------------------------
-
- @Override
- public void emitWatermark(Watermark mark) {
- throw new UnsupportedOperationException("the output should not emit watermarks");
- }
-
- @Override
- public void collect(StreamRecord<T> record) {
- elements.add(record.getValue());
-
- if (timeStampModulus != 0 && record.getTimestamp() % timeStampModulus != 0) {
- throw new IllegalArgumentException("Invalid timestamp");
- }
- synchronized (elements) {
- elements.notifyAll();
- }
- }
-
- @Override
- public void close() {}
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/05d2138f/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windows/KeyMapPutIfAbsentTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windows/KeyMapPutIfAbsentTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windows/KeyMapPutIfAbsentTest.java
deleted file mode 100644
index 2a9e203..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windows/KeyMapPutIfAbsentTest.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.operators.windows;
-
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-public class KeyMapPutIfAbsentTest {
-
- @Test
- public void testPutIfAbsentUniqueKeysAndGrowth() {
- try {
- KeyMap<Integer, Integer> map = new KeyMap<>();
- IntegerFactory factory = new IntegerFactory();
-
- final int numElements = 1000000;
-
- for (int i = 0; i < numElements; i++) {
- factory.set(2 * i + 1);
- map.putIfAbsent(i, factory);
-
- assertEquals(i+1, map.size());
- assertTrue(map.getCurrentTableCapacity() > map.size());
- assertTrue(map.getCurrentTableCapacity() > map.getRehashThreshold());
- assertTrue(map.size() <= map.getRehashThreshold());
- }
-
- assertEquals(numElements, map.size());
- assertEquals(numElements, map.traverseAndCountElements());
- assertEquals(1 << 21, map.getCurrentTableCapacity());
-
- for (int i = 0; i < numElements; i++) {
- assertEquals(2 * i + 1, map.get(i).intValue());
- }
-
- for (int i = numElements - 1; i >= 0; i--) {
- assertEquals(2 * i + 1, map.get(i).intValue());
- }
-
- assertEquals(numElements, map.size());
- assertEquals(numElements, map.traverseAndCountElements());
- assertEquals(1 << 21, map.getCurrentTableCapacity());
- assertTrue(map.getLongestChainLength() <= 7);
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testPutIfAbsentDuplicateKeysAndGrowth() {
- try {
- KeyMap<Integer, Integer> map = new KeyMap<>();
- IntegerFactory factory = new IntegerFactory();
-
- final int numElements = 1000000;
-
- for (int i = 0; i < numElements; i++) {
- int val = 2 * i + 1;
- factory.set(val);
- Integer put = map.putIfAbsent(i, factory);
- assertEquals(val, put.intValue());
- }
-
- for (int i = 0; i < numElements; i += 3) {
- factory.set(2 * i);
- Integer put = map.putIfAbsent(i, factory);
- assertEquals(2 * i + 1, put.intValue());
- }
-
- for (int i = 0; i < numElements; i++) {
- assertEquals(2 * i + 1, map.get(i).intValue());
- }
-
- assertEquals(numElements, map.size());
- assertEquals(numElements, map.traverseAndCountElements());
- assertEquals(1 << 21, map.getCurrentTableCapacity());
- assertTrue(map.getLongestChainLength() <= 7);
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- // ------------------------------------------------------------------------
-
- private static class IntegerFactory implements KeyMap.LazyFactory<Integer> {
-
- private Integer toCreate;
-
- public void set(Integer toCreate) {
- this.toCreate = toCreate;
- }
-
- @Override
- public Integer create() {
- return toCreate;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/05d2138f/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windows/KeyMapPutTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windows/KeyMapPutTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windows/KeyMapPutTest.java
deleted file mode 100644
index 7335976..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windows/KeyMapPutTest.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.operators.windows;
-
-import org.junit.Test;
-
-import java.util.BitSet;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-public class KeyMapPutTest {
-
- @Test
- public void testPutUniqueKeysAndGrowth() {
- try {
- KeyMap<Integer, Integer> map = new KeyMap<>();
-
- final int numElements = 1000000;
-
- for (int i = 0; i < numElements; i++) {
- map.put(i, 2 * i + 1);
-
- assertEquals(i+1, map.size());
- assertTrue(map.getCurrentTableCapacity() > map.size());
- assertTrue(map.getCurrentTableCapacity() > map.getRehashThreshold());
- assertTrue(map.size() <= map.getRehashThreshold());
- }
-
- assertEquals(numElements, map.size());
- assertEquals(numElements, map.traverseAndCountElements());
- assertEquals(1 << 21, map.getCurrentTableCapacity());
-
- for (int i = 0; i < numElements; i++) {
- assertEquals(2 * i + 1, map.get(i).intValue());
- }
-
- for (int i = numElements - 1; i >= 0; i--) {
- assertEquals(2 * i + 1, map.get(i).intValue());
- }
-
- BitSet bitset = new BitSet();
- int numContained = 0;
- for (KeyMap.Entry<Integer, Integer> entry : map) {
- numContained++;
-
- assertEquals(entry.getKey() * 2 + 1, entry.getValue().intValue());
- assertFalse(bitset.get(entry.getKey()));
- bitset.set(entry.getKey());
- }
-
- assertEquals(numElements, numContained);
- assertEquals(numElements, bitset.cardinality());
-
-
- assertEquals(numElements, map.size());
- assertEquals(numElements, map.traverseAndCountElements());
- assertEquals(1 << 21, map.getCurrentTableCapacity());
- assertTrue(map.getLongestChainLength() <= 7);
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testPutDuplicateKeysAndGrowth() {
- try {
- final KeyMap<Integer, Integer> map = new KeyMap<>();
- final int numElements = 1000000;
-
- for (int i = 0; i < numElements; i++) {
- Integer put = map.put(i, 2*i+1);
- assertNull(put);
- }
-
- for (int i = 0; i < numElements; i += 3) {
- Integer put = map.put(i, 2*i);
- assertNotNull(put);
- assertEquals(2*i+1, put.intValue());
- }
-
- for (int i = 0; i < numElements; i++) {
- int expected = (i % 3 == 0) ? (2*i) : (2*i+1);
- assertEquals(expected, map.get(i).intValue());
- }
-
- assertEquals(numElements, map.size());
- assertEquals(numElements, map.traverseAndCountElements());
- assertEquals(1 << 21, map.getCurrentTableCapacity());
- assertTrue(map.getLongestChainLength() <= 7);
-
-
- BitSet bitset = new BitSet();
- int numContained = 0;
- for (KeyMap.Entry<Integer, Integer> entry : map) {
- numContained++;
-
- int key = entry.getKey();
- int expected = key % 3 == 0 ? (2*key) : (2*key+1);
-
- assertEquals(expected, entry.getValue().intValue());
- assertFalse(bitset.get(key));
- bitset.set(key);
- }
-
- assertEquals(numElements, numContained);
- assertEquals(numElements, bitset.cardinality());
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/05d2138f/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windows/KeyMapTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windows/KeyMapTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windows/KeyMapTest.java
deleted file mode 100644
index be71af2..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windows/KeyMapTest.java
+++ /dev/null
@@ -1,344 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.operators.windows;
-
-import org.junit.Test;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Random;
-
-import static org.junit.Assert.*;
-
-public class KeyMapTest {
-
- @Test
- public void testInitialSizeComputation() {
- try {
- KeyMap<String, String> map;
-
- map = new KeyMap<>();
- assertEquals(64, map.getCurrentTableCapacity());
- assertEquals(6, map.getLog2TableCapacity());
- assertEquals(24, map.getShift());
- assertEquals(48, map.getRehashThreshold());
-
- map = new KeyMap<>(0);
- assertEquals(64, map.getCurrentTableCapacity());
- assertEquals(6, map.getLog2TableCapacity());
- assertEquals(24, map.getShift());
- assertEquals(48, map.getRehashThreshold());
-
- map = new KeyMap<>(1);
- assertEquals(64, map.getCurrentTableCapacity());
- assertEquals(6, map.getLog2TableCapacity());
- assertEquals(24, map.getShift());
- assertEquals(48, map.getRehashThreshold());
-
- map = new KeyMap<>(9);
- assertEquals(64, map.getCurrentTableCapacity());
- assertEquals(6, map.getLog2TableCapacity());
- assertEquals(24, map.getShift());
- assertEquals(48, map.getRehashThreshold());
-
- map = new KeyMap<>(63);
- assertEquals(64, map.getCurrentTableCapacity());
- assertEquals(6, map.getLog2TableCapacity());
- assertEquals(24, map.getShift());
- assertEquals(48, map.getRehashThreshold());
-
- map = new KeyMap<>(64);
- assertEquals(128, map.getCurrentTableCapacity());
- assertEquals(7, map.getLog2TableCapacity());
- assertEquals(23, map.getShift());
- assertEquals(96, map.getRehashThreshold());
-
- map = new KeyMap<>(500);
- assertEquals(512, map.getCurrentTableCapacity());
- assertEquals(9, map.getLog2TableCapacity());
- assertEquals(21, map.getShift());
- assertEquals(384, map.getRehashThreshold());
-
- map = new KeyMap<>(127);
- assertEquals(128, map.getCurrentTableCapacity());
- assertEquals(7, map.getLog2TableCapacity());
- assertEquals(23, map.getShift());
- assertEquals(96, map.getRehashThreshold());
-
- // no negative number of elements
- try {
- new KeyMap<>(-1);
- fail("should fail with an exception");
- }
- catch (IllegalArgumentException e) {
- // expected
- }
-
- // check integer overflow
- try {
- map = new KeyMap<>(0x65715522);
-
- final int maxCap = Integer.highestOneBit(Integer.MAX_VALUE);
- assertEquals(Integer.highestOneBit(Integer.MAX_VALUE), map.getCurrentTableCapacity());
- assertEquals(30, map.getLog2TableCapacity());
- assertEquals(0, map.getShift());
- assertEquals(maxCap / 4 * 3, map.getRehashThreshold());
- }
- catch (OutOfMemoryError e) {
- // this may indeed happen in small test setups. we tolerate this in this test
- }
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testPutAndGetRandom() {
- try {
- final KeyMap<Integer, Integer> map = new KeyMap<>();
- final Random rnd = new Random();
-
- final long seed = rnd.nextLong();
- final int numElements = 10000;
-
- final HashMap<Integer, Integer> groundTruth = new HashMap<>();
-
- rnd.setSeed(seed);
- for (int i = 0; i < numElements; i++) {
- Integer key = rnd.nextInt();
- Integer value = rnd.nextInt();
-
- if (rnd.nextBoolean()) {
- groundTruth.put(key, value);
- map.put(key, value);
- }
- }
-
- rnd.setSeed(seed);
- for (int i = 0; i < numElements; i++) {
- Integer key = rnd.nextInt();
-
- // skip these, evaluating it is tricky due to duplicates
- rnd.nextInt();
- rnd.nextBoolean();
-
- Integer expected = groundTruth.get(key);
- if (expected == null) {
- assertNull(map.get(key));
- }
- else {
- Integer contained = map.get(key);
- assertNotNull(contained);
- assertEquals(expected, contained);
- }
- }
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testConjunctTraversal() {
- try {
- final Random rootRnd = new Random(654685486325439L);
-
- final int numMaps = 7;
- final int numKeys = 1000000;
-
- // ------ create a set of maps ------
- @SuppressWarnings("unchecked")
- final KeyMap<Integer, Integer>[] maps = (KeyMap<Integer, Integer>[]) new KeyMap<?, ?>[numMaps];
- for (int i = 0; i < numMaps; i++) {
- maps[i] = new KeyMap<>();
- }
-
- // ------ prepare probabilities for maps ------
- final double[] probabilities = new double[numMaps];
- final double[] probabilitiesTemp = new double[numMaps];
- {
- probabilities[0] = 0.5;
- double remainingProb = 1.0 - probabilities[0];
- for (int i = 1; i < numMaps - 1; i++) {
- remainingProb /= 2;
- probabilities[i] = remainingProb;
- }
-
- // compensate for rounding errors
- probabilities[numMaps - 1] = remainingProb;
- }
-
- // ------ generate random elements ------
- final long probSeed = rootRnd.nextLong();
- final long keySeed = rootRnd.nextLong();
-
- final Random probRnd = new Random(probSeed);
- final Random keyRnd = new Random(keySeed);
-
- final int maxStride = Integer.MAX_VALUE / numKeys;
-
- int totalNumElements = 0;
- int nextKeyValue = 1;
-
- for (int i = 0; i < numKeys; i++) {
- int numCopies = (nextKeyValue % 3) + 1;
- System.arraycopy(probabilities, 0, probabilitiesTemp, 0, numMaps);
-
- double totalProb = 1.0;
- for (int copy = 0; copy < numCopies; copy++) {
- int pos = drawPosProportionally(probabilitiesTemp, totalProb, probRnd);
- totalProb -= probabilitiesTemp[pos];
- probabilitiesTemp[pos] = 0.0;
-
- Integer boxed = nextKeyValue;
- Integer previous = maps[pos].put(boxed, boxed);
- assertNull("Test problem - test does not assign unique maps", previous);
- }
-
- totalNumElements += numCopies;
- nextKeyValue += keyRnd.nextInt(maxStride) + 1;
- }
-
-
- // check that all maps contain the total number of elements
- int numContained = 0;
- for (KeyMap<?, ?> map : maps) {
- numContained += map.size();
- }
- assertEquals(totalNumElements, numContained);
-
- // ------ check that all elements can be found in the maps ------
- keyRnd.setSeed(keySeed);
-
- numContained = 0;
- nextKeyValue = 1;
- for (int i = 0; i < numKeys; i++) {
- int numCopiesExpected = (nextKeyValue % 3) + 1;
- int numCopiesContained = 0;
-
- for (KeyMap<Integer, Integer> map : maps) {
- Integer val = map.get(nextKeyValue);
- if (val != null) {
- assertEquals(nextKeyValue, val.intValue());
- numCopiesContained++;
- }
- }
-
- assertEquals(numCopiesExpected, numCopiesContained);
- numContained += numCopiesContained;
-
- nextKeyValue += keyRnd.nextInt(maxStride) + 1;
- }
- assertEquals(totalNumElements, numContained);
-
- // ------ make a traversal over all keys and validate the keys in the traversal ------
- final int[] keysStartedAndFinished = { 0, 0 };
- KeyMap.TraversalEvaluator<Integer, Integer> traversal = new KeyMap.TraversalEvaluator<Integer, Integer>() {
-
- private int key;
- private int valueCount;
-
- @Override
- public void startNewKey(Integer key) {
- this.key = key;
- this.valueCount = 0;
-
- keysStartedAndFinished[0]++;
- }
-
- @Override
- public void nextValue(Integer value) {
- assertEquals(this.key, value.intValue());
- this.valueCount++;
- }
-
- @Override
- public void keyDone() {
- int expected = (key % 3) + 1;
- if (expected != valueCount) {
- fail("Wrong count for key " + key + " ; expected=" + expected + " , count=" + valueCount);
- }
-
- keysStartedAndFinished[1]++;
- }
- };
-
- KeyMap.traverseMaps(shuffleArray(maps, rootRnd), traversal, 17);
-
- assertEquals(numKeys, keysStartedAndFinished[0]);
- assertEquals(numKeys, keysStartedAndFinished[1]);
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testSizeComparator() {
- try {
- KeyMap<String, String> map1 = new KeyMap<>(5);
- KeyMap<String, String> map2 = new KeyMap<>(80);
-
- assertTrue(map1.getCurrentTableCapacity() < map2.getCurrentTableCapacity());
-
- assertTrue(KeyMap.CapacityDescendingComparator.INSTANCE.compare(map1, map1) == 0);
- assertTrue(KeyMap.CapacityDescendingComparator.INSTANCE.compare(map2, map2) == 0);
- assertTrue(KeyMap.CapacityDescendingComparator.INSTANCE.compare(map1, map2) > 0);
- assertTrue(KeyMap.CapacityDescendingComparator.INSTANCE.compare(map2, map1) < 0);
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- // ------------------------------------------------------------------------
-
- private static int drawPosProportionally(double[] array, double totalProbability, Random rnd) {
- double val = rnd.nextDouble() * totalProbability;
-
- double accum = 0;
- for (int i = 0; i < array.length; i++) {
- accum += array[i];
- if (val <= accum && array[i] > 0.0) {
- return i;
- }
- }
-
- // in case of rounding errors
- return array.length - 1;
- }
-
- private static <E> E[] shuffleArray(E[] array, Random rnd) {
- E[] target = Arrays.copyOf(array, array.length);
-
- for (int i = target.length - 1; i > 0; i--) {
- int swapPos = rnd.nextInt(i + 1);
- E temp = target[i];
- target[i] = target[swapPos];
- target[swapPos] = temp;
- }
-
- return target;
- }
-}