You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/21 11:03:37 UTC
[21/51] [partial] flink git commit: [FLINK-2877] Move Streaming API
out of Staging package
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
deleted file mode 100644
index b94e530..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
+++ /dev/null
@@ -1,470 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.runtime.operators.windowing;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.RichReduceFunction;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.TypeInfoParser;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
-import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows;
-import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows;
-import org.apache.flink.streaming.api.windowing.time.Time;
-import org.apache.flink.streaming.runtime.operators.windowing.buffers.HeapWindowBuffer;
-import org.apache.flink.streaming.runtime.operators.windowing.buffers.PreAggregatingHeapWindowBuffer;
-import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBufferFactory;
-import org.apache.flink.streaming.api.functions.windowing.ReduceWindowFunction;
-import org.apache.flink.streaming.api.windowing.triggers.ContinuousEventTimeTrigger;
-import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
-import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
-import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
-import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
-import org.apache.flink.streaming.util.TestHarnessUtil;
-import org.junit.Assert;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Comparator;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-@RunWith(Parameterized.class)
-public class WindowOperatorTest {
-
- @SuppressWarnings("unchecked,rawtypes")
- private WindowBufferFactory windowBufferFactory;
-
- public WindowOperatorTest(WindowBufferFactory<?, ?> windowBufferFactory) {
- this.windowBufferFactory = windowBufferFactory;
- }
-
- // For counting if close() is called the correct number of times on the SumReducer
- private static AtomicInteger closeCalled = new AtomicInteger(0);
-
- @Test
- @SuppressWarnings("unchecked")
- public void testSlidingEventTimeWindows() throws Exception {
- closeCalled.set(0);
-
- final int WINDOW_SIZE = 3;
- final int WINDOW_SLIDE = 1;
-
- WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>(
- SlidingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS), Time.of(WINDOW_SLIDE, TimeUnit.SECONDS)),
- new TimeWindow.Serializer(),
- new TupleKeySelector(),
- BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
- windowBufferFactory,
- new ReduceWindowFunction<String, TimeWindow, Tuple2<String, Integer>>(new SumReducer()),
- EventTimeTrigger.create());
-
- operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
-
- OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
- new OneInputStreamOperatorTestHarness<>(operator);
-
- long initialTime = 0L;
- ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
-
- testHarness.open();
-
- // add elements out-of-order
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 3999));
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 3000));
-
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 20));
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime));
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 999));
-
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1998));
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1999));
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));
-
-
- testHarness.processWatermark(new Watermark(initialTime + 999));
- expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 3), initialTime + 999));
- expectedOutput.add(new Watermark(999));
- TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
-
-
- testHarness.processWatermark(new Watermark(initialTime + 1999));
- expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 3), initialTime + 1999));
- expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 3), initialTime + 1999));
- expectedOutput.add(new Watermark(1999));
- TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
-
- testHarness.processWatermark(new Watermark(initialTime + 2999));
- expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 3), initialTime + 2999));
- expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 3), initialTime + 2999));
- expectedOutput.add(new Watermark(2999));
- TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
-
- testHarness.processWatermark(new Watermark(initialTime + 3999));
- expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 5), initialTime + 3999));
- expectedOutput.add(new Watermark(3999));
- TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
-
- testHarness.processWatermark(new Watermark(initialTime + 4999));
- expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), initialTime + 4999));
- expectedOutput.add(new Watermark(4999));
- TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
-
- testHarness.processWatermark(new Watermark(initialTime + 5999));
- expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), initialTime + 5999));
- expectedOutput.add(new Watermark(5999));
- TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
-
-
- // those don't have any effect...
- testHarness.processWatermark(new Watermark(initialTime + 6999));
- testHarness.processWatermark(new Watermark(initialTime + 7999));
- expectedOutput.add(new Watermark(6999));
- expectedOutput.add(new Watermark(7999));
-
- TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
-
- testHarness.close();
- if (windowBufferFactory instanceof PreAggregatingHeapWindowBuffer.Factory) {
- Assert.assertEquals("Close was not called.", 2, closeCalled.get());
- } else {
- Assert.assertEquals("Close was not called.", 1, closeCalled.get());
- }
- }
-
- @Test
- @SuppressWarnings("unchecked")
- public void testTumblingEventTimeWindows() throws Exception {
- closeCalled.set(0);
-
- final int WINDOW_SIZE = 3;
-
- WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>(
- TumblingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
- new TimeWindow.Serializer(),
- new TupleKeySelector(),
- BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
- windowBufferFactory,
- new ReduceWindowFunction<String, TimeWindow, Tuple2<String, Integer>>(new SumReducer()),
- EventTimeTrigger.create());
-
- operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
-
-
- OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
- new OneInputStreamOperatorTestHarness<>(operator);
-
- long initialTime = 0L;
- ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
-
- testHarness.open();
-
- // add elements out-of-order
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 3999));
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 3000));
-
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 20));
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime));
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 999));
-
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1998));
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1999));
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));
-
-
- testHarness.processWatermark(new Watermark(initialTime + 999));
- expectedOutput.add(new Watermark(999));
- TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
-
-
- testHarness.processWatermark(new Watermark(initialTime + 1999));
- expectedOutput.add(new Watermark(1999));
- TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
-
- testHarness.processWatermark(new Watermark(initialTime + 2999));
- expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 3), initialTime + 2999));
- expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 3), initialTime + 2999));
- expectedOutput.add(new Watermark(2999));
- TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
-
- testHarness.processWatermark(new Watermark(initialTime + 3999));
- expectedOutput.add(new Watermark(3999));
- TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
-
- testHarness.processWatermark(new Watermark(initialTime + 4999));
- expectedOutput.add(new Watermark(4999));
- TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
-
- testHarness.processWatermark(new Watermark(initialTime + 5999));
- expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 2), initialTime + 5999));
- expectedOutput.add(new Watermark(5999));
- TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
-
-
- // those don't have any effect...
- testHarness.processWatermark(new Watermark(initialTime + 6999));
- testHarness.processWatermark(new Watermark(initialTime + 7999));
- expectedOutput.add(new Watermark(6999));
- expectedOutput.add(new Watermark(7999));
-
- TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
-
- testHarness.close();
- if (windowBufferFactory instanceof PreAggregatingHeapWindowBuffer.Factory) {
- Assert.assertEquals("Close was not called.", 2, closeCalled.get());
- } else {
- Assert.assertEquals("Close was not called.", 1, closeCalled.get());
- }
- }
-
- @Test
- @SuppressWarnings("unchecked")
- public void testContinuousWatermarkTrigger() throws Exception {
- closeCalled.set(0);
-
- final int WINDOW_SIZE = 3;
-
- WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new WindowOperator<>(
- GlobalWindows.create(),
- new GlobalWindow.Serializer(),
- new TupleKeySelector(),
- BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
- windowBufferFactory,
- new ReduceWindowFunction<String, GlobalWindow, Tuple2<String, Integer>>(new SumReducer()),
- ContinuousEventTimeTrigger.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)));
-
- operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
-
- OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
- new OneInputStreamOperatorTestHarness<>(operator);
-
- long initialTime = 0L;
- ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
-
- testHarness.open();
-
- // The global window actually ignores these timestamps...
-
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime));
-
- // add elements out-of-order
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 3000));
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 3999));
-
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 20));
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 999));
-
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1998));
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1999));
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));
-
-
- testHarness.processWatermark(new Watermark(initialTime + 1000));
- expectedOutput.add(new Watermark(1000));
- TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
-
-
- testHarness.processWatermark(new Watermark(initialTime + 2000));
- expectedOutput.add(new Watermark(2000));
- TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
-
- testHarness.processWatermark(new Watermark(initialTime + 3000));
- expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 3), Long.MAX_VALUE));
- expectedOutput.add(new Watermark(3000));
- TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
-
- testHarness.processWatermark(new Watermark(initialTime + 4000));
- expectedOutput.add(new Watermark(4000));
- TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
-
- testHarness.processWatermark(new Watermark(initialTime + 5000));
- expectedOutput.add(new Watermark(5000));
- TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
-
- testHarness.processWatermark(new Watermark(initialTime + 6000));
- expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 3), Long.MAX_VALUE));
- expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 5), Long.MAX_VALUE));
- expectedOutput.add(new Watermark(6000));
- TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
-
-
- // those don't have any effect...
- testHarness.processWatermark(new Watermark(initialTime + 7000));
- testHarness.processWatermark(new Watermark(initialTime + 8000));
- expectedOutput.add(new Watermark(7000));
- expectedOutput.add(new Watermark(8000));
-
- TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
-
- testHarness.close();
- if (windowBufferFactory instanceof PreAggregatingHeapWindowBuffer.Factory) {
- Assert.assertEquals("Close was not called.", 2, closeCalled.get());
- } else {
- Assert.assertEquals("Close was not called.", 1, closeCalled.get());
- }
- }
-
- @Test
- @SuppressWarnings("unchecked")
- public void testCountTrigger() throws Exception {
- closeCalled.set(0);
-
- final int WINDOW_SIZE = 4;
-
- WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new WindowOperator<>(
- GlobalWindows.create(),
- new GlobalWindow.Serializer(),
- new TupleKeySelector(),
- BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
- windowBufferFactory,
- new ReduceWindowFunction<String, GlobalWindow, Tuple2<String, Integer>>(new SumReducer()),
- PurgingTrigger.of(CountTrigger.of(WINDOW_SIZE)));
-
- operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse(
- "Tuple2<String, Integer>"), new ExecutionConfig());
-
- OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
- new OneInputStreamOperatorTestHarness<>(operator);
-
- long initialTime = 0L;
- ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
-
- testHarness.open();
-
- // The global window actually ignores these timestamps...
-
- // add elements out-of-order
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 3000));
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 3999));
-
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 20));
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime));
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 999));
-
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1998));
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1999));
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));
-
-
- expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 4), Long.MAX_VALUE));
-
- TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
-
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), initialTime + 10999));
-
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));
- testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), initialTime + 1000));
-
- expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 4), Long.MAX_VALUE));
- expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 4), Long.MAX_VALUE));
-
- TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new ResultSortComparator());
-
- testHarness.close();
- if (windowBufferFactory instanceof PreAggregatingHeapWindowBuffer.Factory) {
- Assert.assertEquals("Close was not called.", 2, closeCalled.get());
- } else {
- Assert.assertEquals("Close was not called.", 1, closeCalled.get());
- }
-
- }
-
- // ------------------------------------------------------------------------
- // UDFs
- // ------------------------------------------------------------------------
-
- public static class SumReducer extends RichReduceFunction<Tuple2<String, Integer>> {
- private static final long serialVersionUID = 1L;
-
- private boolean openCalled = false;
-
- @Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
- openCalled = true;
- }
-
- @Override
- public void close() throws Exception {
- super.close();
- closeCalled.incrementAndGet();
- }
-
- @Override
- public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1,
- Tuple2<String, Integer> value2) throws Exception {
- if (!openCalled) {
- Assert.fail("Open was not called");
- }
- return new Tuple2<>(value2.f0, value1.f1 + value2.f1);
- }
- }
- // ------------------------------------------------------------------------
- // Parametrization for testing different window buffers
- // ------------------------------------------------------------------------
-
- @Parameterized.Parameters(name = "WindowBuffer = {0}")
- @SuppressWarnings("unchecked,rawtypes")
- public static Collection<WindowBufferFactory[]> windowBuffers(){
- return Arrays.asList(new WindowBufferFactory[]{new PreAggregatingHeapWindowBuffer.Factory(new SumReducer())},
- new WindowBufferFactory[]{new HeapWindowBuffer.Factory()}
- );
- }
-
- @SuppressWarnings("unchecked")
- private static class ResultSortComparator implements Comparator<Object> {
- @Override
- public int compare(Object o1, Object o2) {
- if (o1 instanceof Watermark || o2 instanceof Watermark) {
- return 0;
- } else {
- StreamRecord<Tuple2<String, Integer>> sr0 = (StreamRecord<Tuple2<String, Integer>>) o1;
- StreamRecord<Tuple2<String, Integer>> sr1 = (StreamRecord<Tuple2<String, Integer>>) o2;
- if (sr0.getTimestamp() != sr1.getTimestamp()) {
- return (int) (sr0.getTimestamp() - sr1.getTimestamp());
- }
- int comparison = sr0.getValue().f0.compareTo(sr1.getValue().f0);
- if (comparison != 0) {
- return comparison;
- } else {
- return sr0.getValue().f1 - sr1.getValue().f1;
- }
- }
- }
- }
-
- private static class TupleKeySelector implements KeySelector<Tuple2<String, Integer>, String> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public String getKey(Tuple2<String, Integer> value) throws Exception {
- return value.f0;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
deleted file mode 100644
index 13766a1..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
+++ /dev/null
@@ -1,265 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.runtime.operators.windowing;
-
-import org.apache.flink.api.common.functions.RichReduceFunction;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.WindowedStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.transformations.OneInputTransformation;
-import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows;
-import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows;
-import org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
-import org.apache.flink.streaming.api.windowing.evictors.TimeEvictor;
-import org.apache.flink.streaming.api.windowing.time.Time;
-import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
-import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-import org.apache.flink.streaming.runtime.operators.windowing.buffers.HeapWindowBuffer;
-import org.apache.flink.streaming.runtime.operators.windowing.buffers.PreAggregatingHeapWindowBuffer;
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
-import org.apache.flink.util.Collector;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.concurrent.TimeUnit;
-
-/**
- * These tests verify that the api calls on
- * {@link WindowedStream} instantiate
- * the correct window operator.
- */
-public class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
-
- /**
- * These tests ensure that the fast aligned time windows operator is used if the
- * conditions are right.
- */
- @Test
- public void testFastTimeWindows() throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
- env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
-
- DummyReducer reducer = new DummyReducer();
-
- DataStream<Tuple2<String, Integer>> window1 = source
- .keyBy(0)
- .window(SlidingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
- .reduce(reducer);
-
- OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
- OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator();
- Assert.assertTrue(operator1 instanceof AggregatingProcessingTimeWindowOperator);
-
- DataStream<Tuple2<String, Integer>> window2 = source
- .keyBy(0)
- .window(SlidingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
- .apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void apply(Tuple tuple,
- TimeWindow window,
- Iterable<Tuple2<String, Integer>> values,
- Collector<Tuple2<String, Integer>> out) throws Exception {
-
- }
- });
-
- OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform2 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window2.getTransformation();
- OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator2 = transform2.getOperator();
- Assert.assertTrue(operator2 instanceof AccumulatingProcessingTimeWindowOperator);
- }
-
- /**
- * These tests ensure that the correct trigger is set when using event-time windows.
- */
- @Test
- @SuppressWarnings("rawtypes")
- public void testEventTime() throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
- env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
-
- DummyReducer reducer = new DummyReducer();
-
- DataStream<Tuple2<String, Integer>> window1 = source
- .keyBy(0)
- .window(SlidingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
- .reduce(reducer);
-
- OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
- OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator();
- Assert.assertTrue(operator1 instanceof WindowOperator);
- WindowOperator winOperator1 = (WindowOperator) operator1;
- Assert.assertFalse(winOperator1.isSetProcessingTime());
- Assert.assertTrue(winOperator1.getTrigger() instanceof EventTimeTrigger);
- Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingTimeWindows);
- Assert.assertTrue(winOperator1.getWindowBufferFactory() instanceof PreAggregatingHeapWindowBuffer.Factory);
-
- DataStream<Tuple2<String, Integer>> window2 = source
- .keyBy(0)
- .window(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
- .apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void apply(Tuple tuple,
- TimeWindow window,
- Iterable<Tuple2<String, Integer>> values,
- Collector<Tuple2<String, Integer>> out) throws Exception {
-
- }
- });
-
- OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform2 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window2.getTransformation();
- OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator2 = transform2.getOperator();
- Assert.assertTrue(operator2 instanceof WindowOperator);
- WindowOperator winOperator2 = (WindowOperator) operator2;
- Assert.assertFalse(winOperator2.isSetProcessingTime());
- Assert.assertTrue(winOperator2.getTrigger() instanceof EventTimeTrigger);
- Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingTimeWindows);
- Assert.assertTrue(winOperator2.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory);
- }
-
- @Test
- @SuppressWarnings("rawtypes")
- public void testNonEvicting() throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
-
- DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
-
- DummyReducer reducer = new DummyReducer();
-
- DataStream<Tuple2<String, Integer>> window1 = source
- .keyBy(0)
- .window(SlidingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
- .trigger(CountTrigger.of(100))
- .reduce(reducer);
-
- OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
- OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator();
- Assert.assertTrue(operator1 instanceof WindowOperator);
- WindowOperator winOperator1 = (WindowOperator) operator1;
- Assert.assertTrue(winOperator1.isSetProcessingTime());
- Assert.assertTrue(winOperator1.getTrigger() instanceof CountTrigger);
- Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingTimeWindows);
- Assert.assertTrue(winOperator1.getWindowBufferFactory() instanceof PreAggregatingHeapWindowBuffer.Factory);
-
- DataStream<Tuple2<String, Integer>> window2 = source
- .keyBy(0)
- .window(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
- .trigger(CountTrigger.of(100))
- .apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void apply(Tuple tuple,
- TimeWindow window,
- Iterable<Tuple2<String, Integer>> values,
- Collector<Tuple2<String, Integer>> out) throws Exception {
-
- }
- });
-
- OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform2 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window2.getTransformation();
- OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator2 = transform2.getOperator();
- Assert.assertTrue(operator2 instanceof WindowOperator);
- WindowOperator winOperator2 = (WindowOperator) operator2;
- Assert.assertTrue(winOperator2.isSetProcessingTime());
- Assert.assertTrue(winOperator2.getTrigger() instanceof CountTrigger);
- Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingTimeWindows);
- Assert.assertTrue(winOperator2.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory);
- }
-
- @Test
- @SuppressWarnings("rawtypes")
- public void testEvicting() throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
-
- DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
-
- DummyReducer reducer = new DummyReducer();
-
- DataStream<Tuple2<String, Integer>> window1 = source
- .keyBy(0)
- .window(SlidingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
- .evictor(CountEvictor.of(100))
- .reduce(reducer);
-
- OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
- OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator();
- Assert.assertTrue(operator1 instanceof EvictingWindowOperator);
- EvictingWindowOperator winOperator1 = (EvictingWindowOperator) operator1;
- Assert.assertFalse(winOperator1.isSetProcessingTime());
- Assert.assertTrue(winOperator1.getTrigger() instanceof EventTimeTrigger);
- Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingTimeWindows);
- Assert.assertTrue(winOperator1.getEvictor() instanceof CountEvictor);
- Assert.assertTrue(winOperator1.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory);
-
- DataStream<Tuple2<String, Integer>> window2 = source
- .keyBy(0)
- .window(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
- .trigger(CountTrigger.of(100))
- .evictor(TimeEvictor.of(Time.of(100, TimeUnit.MILLISECONDS)))
- .apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void apply(Tuple tuple,
- TimeWindow window,
- Iterable<Tuple2<String, Integer>> values,
- Collector<Tuple2<String, Integer>> out) throws Exception {
-
- }
- });
-
- OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform2 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window2.getTransformation();
- OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator2 = transform2.getOperator();
- Assert.assertTrue(operator2 instanceof EvictingWindowOperator);
- EvictingWindowOperator winOperator2 = (EvictingWindowOperator) operator2;
- Assert.assertFalse(winOperator2.isSetProcessingTime());
- Assert.assertTrue(winOperator2.getTrigger() instanceof CountTrigger);
- Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingTimeWindows);
- Assert.assertTrue(winOperator2.getEvictor() instanceof TimeEvictor);
- Assert.assertTrue(winOperator2.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory);
- }
-
- // ------------------------------------------------------------------------
- // UDFs
- // ------------------------------------------------------------------------
-
- public static class DummyReducer extends RichReduceFunction<Tuple2<String, Integer>> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
- return value1;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/BroadcastPartitionerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/BroadcastPartitionerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/BroadcastPartitionerTest.java
deleted file mode 100644
index a1cea13..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/BroadcastPartitionerTest.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.partitioner;
-
-import static org.junit.Assert.assertArrayEquals;
-
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.junit.Before;
-import org.junit.Test;
-
-public class BroadcastPartitionerTest {
-
- private BroadcastPartitioner<Tuple> broadcastPartitioner1;
- private BroadcastPartitioner<Tuple> broadcastPartitioner2;
- private BroadcastPartitioner<Tuple> broadcastPartitioner3;
-
- private StreamRecord<Tuple> streamRecord = new StreamRecord<Tuple>(null);
- private SerializationDelegate<StreamRecord<Tuple>> sd = new SerializationDelegate<StreamRecord<Tuple>>(null);
-
- @Before
- public void setPartitioner() {
- broadcastPartitioner1 = new BroadcastPartitioner<Tuple>();
- broadcastPartitioner2 = new BroadcastPartitioner<Tuple>();
- broadcastPartitioner3 = new BroadcastPartitioner<Tuple>();
-
- }
-
- @Test
- public void testSelectChannels() {
- int[] first = new int[] { 0 };
- int[] second = new int[] { 0, 1 };
- int[] sixth = new int[] { 0, 1, 2, 3, 4, 5 };
- sd.setInstance(streamRecord);
- assertArrayEquals(first, broadcastPartitioner1.selectChannels(sd, 1));
- assertArrayEquals(second, broadcastPartitioner2.selectChannels(sd, 2));
- assertArrayEquals(sixth, broadcastPartitioner3.selectChannels(sd, 6));
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/ForwardPartitionerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/ForwardPartitionerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/ForwardPartitionerTest.java
deleted file mode 100644
index f7bd739..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/ForwardPartitionerTest.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.partitioner;
-
-import static org.junit.Assert.*;
-
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.junit.Before;
-import org.junit.Test;
-
-public class ForwardPartitionerTest {
-
- private ForwardPartitioner<Tuple> forwardPartitioner;
- private StreamRecord<Tuple> streamRecord = new StreamRecord<Tuple>(null);
- private SerializationDelegate<StreamRecord<Tuple>> sd = new SerializationDelegate<StreamRecord<Tuple>>(
- null);
-
- @Before
- public void setPartitioner() {
- forwardPartitioner = new ForwardPartitioner<Tuple>();
- }
-
- @Test
- public void testSelectChannelsLength() {
- sd.setInstance(streamRecord);
- assertEquals(1, forwardPartitioner.selectChannels(sd, 1).length);
- assertEquals(1, forwardPartitioner.selectChannels(sd, 2).length);
- assertEquals(1, forwardPartitioner.selectChannels(sd, 1024).length);
- }
-
- @Test
- public void testSelectChannelsInterval() {
- sd.setInstance(streamRecord);
- assertEquals(0, forwardPartitioner.selectChannels(sd, 1)[0]);
- assertEquals(0, forwardPartitioner.selectChannels(sd, 2)[0]);
- assertEquals(0, forwardPartitioner.selectChannels(sd, 1024)[0]);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/GlobalPartitionerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/GlobalPartitionerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/GlobalPartitionerTest.java
deleted file mode 100644
index 6ae3730..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/GlobalPartitionerTest.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.partitioner;
-
-import static org.junit.Assert.assertArrayEquals;
-
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.junit.Before;
-import org.junit.Test;
-
-public class GlobalPartitionerTest {
-
- private GlobalPartitioner<Tuple> globalPartitioner;
- private StreamRecord<Tuple> streamRecord = new StreamRecord<Tuple>(null);
- private SerializationDelegate<StreamRecord<Tuple>> sd = new SerializationDelegate<StreamRecord<Tuple>>(
- null);
-
- @Before
- public void setPartitioner() {
- globalPartitioner = new GlobalPartitioner<Tuple>();
- }
-
- @Test
- public void testSelectChannels() {
- int[] result = new int[] { 0 };
-
- sd.setInstance(streamRecord);
-
- assertArrayEquals(result, globalPartitioner.selectChannels(sd, 1));
- assertArrayEquals(result, globalPartitioner.selectChannels(sd, 2));
- assertArrayEquals(result, globalPartitioner.selectChannels(sd, 1024));
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/HashPartitionerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/HashPartitionerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/HashPartitionerTest.java
deleted file mode 100644
index 6dbf932..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/HashPartitionerTest.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.partitioner;
-
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.junit.Before;
-import org.junit.Test;
-
-public class HashPartitionerTest {
-
- private HashPartitioner<Tuple2<String, Integer>> hashPartitioner;
- private StreamRecord<Tuple2<String, Integer>> streamRecord1 = new StreamRecord<Tuple2<String, Integer>>(new Tuple2<String, Integer>("test", 0));
- private StreamRecord<Tuple2<String, Integer>> streamRecord2 = new StreamRecord<Tuple2<String, Integer>>(new Tuple2<String, Integer>("test", 42));
- private SerializationDelegate<StreamRecord<Tuple2<String, Integer>>> sd1 = new SerializationDelegate<StreamRecord<Tuple2<String, Integer>>>(null);
- private SerializationDelegate<StreamRecord<Tuple2<String, Integer>>> sd2 = new SerializationDelegate<StreamRecord<Tuple2<String, Integer>>>(null);
-
- @Before
- public void setPartitioner() {
- hashPartitioner = new HashPartitioner<Tuple2<String, Integer>>(new KeySelector<Tuple2<String, Integer>, String>() {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public String getKey(Tuple2<String, Integer> value) throws Exception {
- return value.getField(0);
- }
- });
- }
-
- @Test
- public void testSelectChannelsLength() {
- sd1.setInstance(streamRecord1);
- assertEquals(1, hashPartitioner.selectChannels(sd1, 1).length);
- assertEquals(1, hashPartitioner.selectChannels(sd1, 2).length);
- assertEquals(1, hashPartitioner.selectChannels(sd1, 1024).length);
- }
-
- @Test
- public void testSelectChannelsGrouping() {
- sd1.setInstance(streamRecord1);
- sd2.setInstance(streamRecord2);
-
- assertArrayEquals(hashPartitioner.selectChannels(sd1, 1),
- hashPartitioner.selectChannels(sd2, 1));
- assertArrayEquals(hashPartitioner.selectChannels(sd1, 2),
- hashPartitioner.selectChannels(sd2, 2));
- assertArrayEquals(hashPartitioner.selectChannels(sd1, 1024),
- hashPartitioner.selectChannels(sd2, 1024));
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/RebalancePartitionerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/RebalancePartitionerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/RebalancePartitionerTest.java
deleted file mode 100644
index aa70e8a..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/RebalancePartitionerTest.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.partitioner;
-
-import static org.junit.Assert.*;
-
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.junit.Before;
-import org.junit.Test;
-
-public class RebalancePartitionerTest {
-
- private RebalancePartitioner<Tuple> distributePartitioner;
- private StreamRecord<Tuple> streamRecord = new StreamRecord<Tuple>(null);
- private SerializationDelegate<StreamRecord<Tuple>> sd = new SerializationDelegate<StreamRecord<Tuple>>(
- null);
-
- @Before
- public void setPartitioner() {
- distributePartitioner = new RebalancePartitioner<Tuple>();
- }
-
- @Test
- public void testSelectChannelsLength() {
- sd.setInstance(streamRecord);
- assertEquals(1, distributePartitioner.selectChannels(sd, 1).length);
- assertEquals(1, distributePartitioner.selectChannels(sd, 2).length);
- assertEquals(1, distributePartitioner.selectChannels(sd, 1024).length);
- }
-
- @Test
- public void testSelectChannelsInterval() {
- sd.setInstance(streamRecord);
- assertEquals(0, distributePartitioner.selectChannels(sd, 3)[0]);
- assertEquals(1, distributePartitioner.selectChannels(sd, 3)[0]);
- assertEquals(2, distributePartitioner.selectChannels(sd, 3)[0]);
- assertEquals(0, distributePartitioner.selectChannels(sd, 3)[0]);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/ShufflePartitionerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/ShufflePartitionerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/ShufflePartitionerTest.java
deleted file mode 100644
index aff177c..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/ShufflePartitionerTest.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.partitioner;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.junit.Before;
-import org.junit.Test;
-
-public class ShufflePartitionerTest {
-
- private ShufflePartitioner<Tuple> shufflePartitioner;
- private StreamRecord<Tuple> streamRecord = new StreamRecord<Tuple>(null);
- private SerializationDelegate<StreamRecord<Tuple>> sd = new SerializationDelegate<StreamRecord<Tuple>>(
- null);
-
- @Before
- public void setPartitioner() {
- shufflePartitioner = new ShufflePartitioner<Tuple>();
- }
-
- @Test
- public void testSelectChannelsLength() {
- sd.setInstance(streamRecord);
- assertEquals(1, shufflePartitioner.selectChannels(sd, 1).length);
- assertEquals(1, shufflePartitioner.selectChannels(sd, 2).length);
- assertEquals(1, shufflePartitioner.selectChannels(sd, 1024).length);
- }
-
- @Test
- public void testSelectChannelsInterval() {
- sd.setInstance(streamRecord);
- assertEquals(0, shufflePartitioner.selectChannels(sd, 1)[0]);
-
- assertTrue(0 <= shufflePartitioner.selectChannels(sd, 2)[0]);
- assertTrue(2 > shufflePartitioner.selectChannels(sd, 2)[0]);
-
- assertTrue(0 <= shufflePartitioner.selectChannels(sd, 1024)[0]);
- assertTrue(1024 > shufflePartitioner.selectChannels(sd, 1024)[0]);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordSerializerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordSerializerTest.java
deleted file mode 100644
index d48f7f4..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordSerializerTest.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.streamrecord;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.base.LongSerializer;
-import org.junit.Test;
-
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
-
-public class StreamRecordSerializerTest {
-
- @Test
- public void testDeepDuplication() {
- try {
- @SuppressWarnings("unchecked")
- TypeSerializer<Long> serializer1 = (TypeSerializer<Long>) mock(TypeSerializer.class);
- @SuppressWarnings("unchecked")
- TypeSerializer<Long> serializer2 = (TypeSerializer<Long>) mock(TypeSerializer.class);
-
- when(serializer1.duplicate()).thenReturn(serializer2);
-
- StreamRecordSerializer<Long> streamRecSer = new StreamRecordSerializer<Long>(serializer1);
- assertEquals(serializer1, streamRecSer.getContainedTypeSerializer());
-
- StreamRecordSerializer<Long> copy = streamRecSer.duplicate();
- assertNotEquals(copy, streamRecSer);
- assertNotEquals(copy.getContainedTypeSerializer(), streamRecSer.getContainedTypeSerializer());
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testBasicProperties() {
- try {
- StreamRecordSerializer<Long> streamRecSer = new StreamRecordSerializer<Long>(LongSerializer.INSTANCE);
-
- assertFalse(streamRecSer.isImmutableType());
- assertEquals(Long.class, streamRecSer.createInstance().getValue().getClass());
- assertEquals(LongSerializer.INSTANCE.getLength(), streamRecSer.getLength());
-
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
deleted file mode 100644
index 4c6957b..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
+++ /dev/null
@@ -1,334 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.runtime.tasks;
-
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
-import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
-import org.apache.flink.streaming.api.graph.StreamConfig;
-import org.apache.flink.streaming.api.operators.StreamMap;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.util.TestHarnessUtil;
-import org.junit.Assert;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-import java.util.List;
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-/**
- * Tests for {@link OneInputStreamTask}.
- *
- * <p>
- * Note:<br>
- * We only use a {@link StreamMap} operator here. We also test the individual operators but Map is
- * used as a representative to test OneInputStreamTask, since OneInputStreamTask is used for all
- * OneInputStreamOperators.
- */
-@RunWith(PowerMockRunner.class)
-@PrepareForTest({ResultPartitionWriter.class})
-public class OneInputStreamTaskTest {
-
- /**
- * This test verifies that open() and close() are correctly called. This test also verifies
- * that timestamps of emitted elements are correct. {@link StreamMap} assigns the input
- * timestamp to emitted elements.
- */
- @Test
- public void testOpenCloseAndTimestamps() throws Exception {
- final OneInputStreamTask<String, String> mapTask = new OneInputStreamTask<String, String>();
- final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<String, String>(mapTask, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
-
- StreamConfig streamConfig = testHarness.getStreamConfig();
- StreamMap<String, String> mapOperator = new StreamMap<String, String>(new TestOpenCloseMapFunction());
- streamConfig.setStreamOperator(mapOperator);
-
- long initialTime = 0L;
- ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
-
- testHarness.invoke();
-
- testHarness.processElement(new StreamRecord<String>("Hello", initialTime + 1));
- testHarness.processElement(new StreamRecord<String>("Ciao", initialTime + 2));
- expectedOutput.add(new StreamRecord<String>("Hello", initialTime + 1));
- expectedOutput.add(new StreamRecord<String>("Ciao", initialTime + 2));
-
- testHarness.endInput();
-
- testHarness.waitForTaskCompletion();
-
- Assert.assertTrue("RichFunction methods where not called.", TestOpenCloseMapFunction.closeCalled);
-
- TestHarnessUtil.assertOutputEquals("Output was not correct.",
- expectedOutput,
- testHarness.getOutput());
- }
-
- /**
- * This test verifies that watermarks are correctly forwarded. This also checks whether
- * watermarks are forwarded only when we have received watermarks from all inputs. The
- * forwarded watermark must be the minimum of the watermarks of all inputs.
- */
- @Test
- @SuppressWarnings("unchecked")
- public void testWatermarkForwarding() throws Exception {
- final OneInputStreamTask<String, String> mapTask = new OneInputStreamTask<String, String>();
- final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<String, String>(mapTask, 2, 2, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
-
- StreamConfig streamConfig = testHarness.getStreamConfig();
- StreamMap<String, String> mapOperator = new StreamMap<String, String>(new IdentityMap());
- streamConfig.setStreamOperator(mapOperator);
-
- ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
- long initialTime = 0L;
-
- testHarness.invoke();
-
- testHarness.processElement(new Watermark(initialTime), 0, 0);
- testHarness.processElement(new Watermark(initialTime), 0, 1);
- testHarness.processElement(new Watermark(initialTime), 1, 0);
-
- // now the output should still be empty
- testHarness.waitForInputProcessing();
- TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
-
- testHarness.processElement(new Watermark(initialTime), 1, 1);
-
- // now the watermark should have propagated, Map simply forward Watermarks
- testHarness.waitForInputProcessing();
- expectedOutput.add(new Watermark(initialTime));
- TestHarnessUtil.assertOutputEquals("Output was not correct.",
- expectedOutput,
- testHarness.getOutput());
-
- // contrary to checkpoint barriers these elements are not blocked by watermarks
- testHarness.processElement(new StreamRecord<String>("Hello", initialTime));
- testHarness.processElement(new StreamRecord<String>("Ciao", initialTime));
- expectedOutput.add(new StreamRecord<String>("Hello", initialTime));
- expectedOutput.add(new StreamRecord<String>("Ciao", initialTime));
-
- testHarness.processElement(new Watermark(initialTime + 4), 0, 0);
- testHarness.processElement(new Watermark(initialTime + 3), 0, 1);
- testHarness.processElement(new Watermark(initialTime + 3), 1, 0);
- testHarness.processElement(new Watermark(initialTime + 2), 1, 1);
-
- // check whether we get the minimum of all the watermarks, this must also only occur in
- // the output after the two StreamRecords
- testHarness.waitForInputProcessing();
- expectedOutput.add(new Watermark(initialTime + 2));
- TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
-
-
- // advance watermark from one of the inputs, now we should get a now one since the
- // minimum increases
- testHarness.processElement(new Watermark(initialTime + 4), 1, 1);
- testHarness.waitForInputProcessing();
- expectedOutput.add(new Watermark(initialTime + 3));
- TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
-
- // advance the other two inputs, now we should get a new one since the
- // minimum increases again
- testHarness.processElement(new Watermark(initialTime + 4), 0, 1);
- testHarness.processElement(new Watermark(initialTime + 4), 1, 0);
- testHarness.waitForInputProcessing();
- expectedOutput.add(new Watermark(initialTime + 4));
- TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
-
- testHarness.endInput();
-
- testHarness.waitForTaskCompletion();
-
- List<String> resultElements = TestHarnessUtil.getRawElementsFromOutput(testHarness.getOutput());
- Assert.assertEquals(2, resultElements.size());
- }
-
- /**
- * This test verifies that checkpoint barriers are correctly forwarded.
- */
- @Test
- public void testCheckpointBarriers() throws Exception {
- final OneInputStreamTask<String, String> mapTask = new OneInputStreamTask<String, String>();
- final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<String, String>(mapTask, 2, 2, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
-
- StreamConfig streamConfig = testHarness.getStreamConfig();
- StreamMap<String, String> mapOperator = new StreamMap<String, String>(new IdentityMap());
- streamConfig.setStreamOperator(mapOperator);
-
- ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
- long initialTime = 0L;
-
- testHarness.invoke();
-
- testHarness.processEvent(new CheckpointBarrier(0, 0), 0, 0);
-
- // These elements should be buffered until we receive barriers from
- // all inputs
- testHarness.processElement(new StreamRecord<String>("Hello-0-0", initialTime), 0, 0);
- testHarness.processElement(new StreamRecord<String>("Ciao-0-0", initialTime), 0, 0);
-
- // These elements should be forwarded, since we did not yet receive a checkpoint barrier
- // on that input, only add to same input, otherwise we would not know the ordering
- // of the output since the Task might read the inputs in any order
- testHarness.processElement(new StreamRecord<String>("Hello-1-1", initialTime), 1, 1);
- testHarness.processElement(new StreamRecord<String>("Ciao-1-1", initialTime), 1, 1);
- expectedOutput.add(new StreamRecord<String>("Hello-1-1", initialTime));
- expectedOutput.add(new StreamRecord<String>("Ciao-1-1", initialTime));
-
- testHarness.waitForInputProcessing();
- // we should not yet see the barrier, only the two elements from non-blocked input
- TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
-
- testHarness.processEvent(new CheckpointBarrier(0, 0), 0, 1);
- testHarness.processEvent(new CheckpointBarrier(0, 0), 1, 0);
- testHarness.processEvent(new CheckpointBarrier(0, 0), 1, 1);
-
- testHarness.waitForInputProcessing();
-
- // now we should see the barrier and after that the buffered elements
- expectedOutput.add(new CheckpointBarrier(0, 0));
- expectedOutput.add(new StreamRecord<String>("Hello-0-0", initialTime));
- expectedOutput.add(new StreamRecord<String>("Ciao-0-0", initialTime));
-
- testHarness.endInput();
-
- testHarness.waitForTaskCompletion();
-
- TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
- }
-
- /**
- * This test verifies that checkpoint barriers and barrier buffers work correctly with
- * concurrent checkpoint barriers where one checkpoint is "overtaking" another checkpoint, i.e.
- * some inputs receive barriers from an earlier checkpoint, thereby blocking,
- * then all inputs receive barriers from a later checkpoint.
- */
- @Test
- public void testOvertakingCheckpointBarriers() throws Exception {
- final OneInputStreamTask<String, String> mapTask = new OneInputStreamTask<String, String>();
- final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<String, String>(mapTask, 2, 2, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
-
- StreamConfig streamConfig = testHarness.getStreamConfig();
- StreamMap<String, String> mapOperator = new StreamMap<String, String>(new IdentityMap());
- streamConfig.setStreamOperator(mapOperator);
-
- ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
- long initialTime = 0L;
-
- testHarness.invoke();
-
- testHarness.processEvent(new CheckpointBarrier(0, 0), 0, 0);
-
- // These elements should be buffered until we receive barriers from
- // all inputs
- testHarness.processElement(new StreamRecord<String>("Hello-0-0", initialTime), 0, 0);
- testHarness.processElement(new StreamRecord<String>("Ciao-0-0", initialTime), 0, 0);
-
- // These elements should be forwarded, since we did not yet receive a checkpoint barrier
- // on that input, only add to same input, otherwise we would not know the ordering
- // of the output since the Task might read the inputs in any order
- testHarness.processElement(new StreamRecord<String>("Hello-1-1", initialTime), 1, 1);
- testHarness.processElement(new StreamRecord<String>("Ciao-1-1", initialTime), 1, 1);
- expectedOutput.add(new StreamRecord<String>("Hello-1-1", initialTime));
- expectedOutput.add(new StreamRecord<String>("Ciao-1-1", initialTime));
-
- testHarness.waitForInputProcessing();
- // we should not yet see the barrier, only the two elements from non-blocked input
- TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
-
- // Now give a later barrier to all inputs, this should unblock the first channel,
- // thereby allowing the two blocked elements through
- testHarness.processEvent(new CheckpointBarrier(1, 1), 0, 0);
- testHarness.processEvent(new CheckpointBarrier(1, 1), 0, 1);
- testHarness.processEvent(new CheckpointBarrier(1, 1), 1, 0);
- testHarness.processEvent(new CheckpointBarrier(1, 1), 1, 1);
-
- expectedOutput.add(new StreamRecord<String>("Hello-0-0", initialTime));
- expectedOutput.add(new StreamRecord<String>("Ciao-0-0", initialTime));
- expectedOutput.add(new CheckpointBarrier(1, 1));
-
- testHarness.waitForInputProcessing();
-
- TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
-
-
- // Then give the earlier barrier, these should be ignored
- testHarness.processEvent(new CheckpointBarrier(0, 0), 0, 1);
- testHarness.processEvent(new CheckpointBarrier(0, 0), 1, 0);
- testHarness.processEvent(new CheckpointBarrier(0, 0), 1, 1);
-
- testHarness.waitForInputProcessing();
-
- testHarness.endInput();
-
- testHarness.waitForTaskCompletion();
-
- TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
- }
-
- // This must only be used in one test, otherwise the static fields will be changed
- // by several tests concurrently
- private static class TestOpenCloseMapFunction extends RichMapFunction<String, String> {
- private static final long serialVersionUID = 1L;
-
- public static boolean openCalled = false;
- public static boolean closeCalled = false;
-
- @Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
- if (closeCalled) {
- Assert.fail("Close called before open.");
- }
- openCalled = true;
- }
-
- @Override
- public void close() throws Exception {
- super.close();
- if (!openCalled) {
- Assert.fail("Open was not called before close.");
- }
- closeCalled = true;
- }
-
- @Override
- public String map(String value) throws Exception {
- if (!openCalled) {
- Assert.fail("Open was not called before run.");
- }
- return value;
- }
- }
-
- private static class IdentityMap implements MapFunction<String, String> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public String map(String value) throws Exception {
- return value;
- }
- }
-}
-
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java
deleted file mode 100644
index 7fb8ba3..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.runtime.tasks;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.event.AbstractEvent;
-import org.apache.flink.runtime.io.network.partition.consumer.StreamTestSingleInputGate;
-
-import java.io.IOException;
-
-
-/**
- * Test harness for testing a {@link org.apache.flink.streaming.runtime.tasks.OneInputStreamTask}.
- *
- * <p>
- * This mock Invokable provides the task with a basic runtime context and allows pushing elements
- * and watermarks into the task. {@link #getOutput()} can be used to get the emitted elements
- * and events. You are free to modify the retrieved list.
- *
- * <p>
- * After setting up everything the Task can be invoked using {@link #invoke()}. This will start
- * a new Thread to execute the Task. Use {@link #waitForTaskCompletion()} to wait for the Task
- * thread to finish. Use {@link #processElement} to send elements to the task. Use
- * {@link #processEvent(AbstractEvent)} to send events to the task.
- * Before waiting for the task to finish you must call {@link #endInput()} to signal to the task
- * that data entry is finished.
- *
- * <p>
- * When Elements or Events are offered to the Task they are put into a queue. The input gates
- * of the Task read from this queue. Use {@link #waitForInputProcessing()} to wait until all
- * queues are empty. This must be used after entering some elements before checking the
- * desired output.
- *
- * <p>
- * When using this you need to add the following line to your test class to setup Powermock:
- * {@code @PrepareForTest({ResultPartitionWriter.class})}
- */
-public class OneInputStreamTaskTestHarness<IN, OUT> extends StreamTaskTestHarness<OUT> {
-
- private TypeInformation<IN> inputType;
- private TypeSerializer<IN> inputSerializer;
-
- /**
- * Creates a test harness with the specified number of input gates and specified number
- * of channels per input gate.
- */
- public OneInputStreamTaskTestHarness(OneInputStreamTask<IN, OUT> task,
- int numInputGates,
- int numInputChannelsPerGate,
- TypeInformation<IN> inputType,
- TypeInformation<OUT> outputType) {
- super(task, outputType);
-
- this.inputType = inputType;
- inputSerializer = inputType.createSerializer(executionConfig);
-
- this.numInputGates = numInputGates;
- this.numInputChannelsPerGate = numInputChannelsPerGate;
- }
-
- /**
- * Creates a test harness with one input gate that has one input channel.
- */
- public OneInputStreamTaskTestHarness(OneInputStreamTask<IN, OUT> task,
- TypeInformation<IN> inputType,
- TypeInformation<OUT> outputType) {
- this(task, 1, 1, inputType, outputType);
- }
-
- @Override
- protected void initializeInputs() throws IOException, InterruptedException {
- inputGates = new StreamTestSingleInputGate[numInputGates];
-
- for (int i = 0; i < numInputGates; i++) {
- inputGates[i] = new StreamTestSingleInputGate<IN>(
- numInputChannelsPerGate,
- bufferSize,
- inputSerializer);
- this.mockEnv.addInputGate(inputGates[i].getInputGate());
- }
-
-
- streamConfig.setNumberOfInputs(1);
- streamConfig.setTypeSerializerIn1(inputSerializer);
- }
-
-}
-