You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/09 12:16:38 UTC
[02/10] flink git commit: [FLINK-2780] Remove Old Windowing Logic and
API
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingTimePreReducerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingTimePreReducerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingTimePreReducerTest.java
deleted file mode 100644
index ce312d3..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingTimePreReducerTest.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.windowbuffer;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.List;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.streaming.api.windowing.helper.Timestamp;
-import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
-import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest.TestOutput;
-import org.junit.Test;
-
-public class JumpingTimePreReducerTest {
-
- TypeSerializer<Integer> serializer = TypeExtractor.getForObject(1).createSerializer(null);
-
- ReduceFunction<Integer> reducer = new SumReducer();
-
- @Test
- public void testEmitWindow() throws Exception {
-
- TestOutput<StreamWindow<Integer>> collector = new TestOutput<StreamWindow<Integer>>();
- List<StreamWindow<Integer>> collected = collector.getCollected();
-
- WindowBuffer<Integer> wb = new JumpingTimePreReducer<Integer>(
- reducer, serializer, 3, 2, new TimestampWrapper<Integer>(new Timestamp<Integer>() {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public long getTimestamp(Integer value) {
- return value;
- }
- }, 1));
-
- wb.store(1);
- wb.store(2);
- wb.store(3);
- wb.evict(1);
- wb.emitWindow(collector);
-
- assertEquals(1, collected.size());
- assertEquals(StreamWindow.fromElements(5),
- collected.get(0));
-
- wb.store(4);
- wb.store(5);
-
- // Nothing should happen here
- wb.evict(2);
-
- wb.store(6);
-
- wb.emitWindow(collector);
- wb.evict(2);
- wb.emitWindow(collector);
- wb.store(12);
- wb.emitWindow(collector);
-
- assertEquals(3, collected.size());
- assertEquals(StreamWindow.fromElements(11),
- collected.get(1));
- assertEquals(StreamWindow.fromElements(12),
- collected.get(2));
- }
-
- private static class SumReducer implements ReduceFunction<Integer> {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public Integer reduce(Integer value1, Integer value2) throws Exception {
- return value1 + value2;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountGroupedPreReducerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountGroupedPreReducerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountGroupedPreReducerTest.java
deleted file mode 100644
index 7f58527..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountGroupedPreReducerTest.java
+++ /dev/null
@@ -1,235 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements. See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.flink.streaming.api.windowing.windowbuffer;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.streaming.api.operators.windowing.WindowingITCase;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest.TestOutput;
-import org.junit.Test;
-
-public class SlidingCountGroupedPreReducerTest {
-
- TypeSerializer<Integer> serializer = TypeExtractor.getForObject(1).createSerializer(null);
-
- ReduceFunction<Integer> reducer = new SumReducer();
-
- KeySelector<Integer, ?> key = new WindowingITCase.ModKey(2);
-
- @Test
- public void testPreReduce1() throws Exception {
- TestOutput<StreamWindow<Integer>> collector = new TestOutput<StreamWindow<Integer>>();
-
- SlidingCountGroupedPreReducer<Integer> preReducer = new SlidingCountGroupedPreReducer<Integer>(
- reducer, serializer, key, 3, 2, 0);
-
- preReducer.store(1);
- preReducer.store(2);
- preReducer.emitWindow(collector);
- preReducer.store(3);
- preReducer.store(4);
- preReducer.evict(1);
- preReducer.emitWindow(collector);
- preReducer.evict(2);
- preReducer.store(5);
- preReducer.store(6);
- preReducer.emitWindow(collector);
- preReducer.evict(2);
- preReducer.store(7);
- preReducer.store(8);
- preReducer.emitWindow(collector);
- preReducer.evict(2);
- preReducer.store(9);
- preReducer.store(10);
- preReducer.emitWindow(collector);
- preReducer.evict(2);
- preReducer.store(11);
- preReducer.store(12);
- preReducer.emitWindow(collector);
- preReducer.store(13);
-
- List<StreamWindow<Integer>> expected = new ArrayList<StreamWindow<Integer>>();
- expected.add(StreamWindow.fromElements(1, 2));
- expected.add(StreamWindow.fromElements(3, 6));
- expected.add(StreamWindow.fromElements(5, 10));
- expected.add(StreamWindow.fromElements(7, 14));
- expected.add(StreamWindow.fromElements(9, 18));
- expected.add(StreamWindow.fromElements(11, 22));
-
- checkResults(expected, collector.getCollected());
- }
-
- @Test
- public void testPreReduce2() throws Exception {
- TestOutput<StreamWindow<Integer>> collector = new TestOutput<StreamWindow<Integer>>();
-
- SlidingCountGroupedPreReducer<Integer> preReducer = new SlidingCountGroupedPreReducer<Integer>(
- reducer, serializer, key, 5, 2, 0);
-
- preReducer.store(1);
- preReducer.store(2);
- preReducer.emitWindow(collector);
- preReducer.store(3);
- preReducer.store(4);
- preReducer.emitWindow(collector);
- preReducer.store(5);
- preReducer.store(6);
- preReducer.evict(1);
- preReducer.emitWindow(collector);
- preReducer.evict(2);
- preReducer.store(7);
- preReducer.store(8);
- preReducer.emitWindow(collector);
- preReducer.evict(2);
- preReducer.store(9);
- preReducer.store(10);
- preReducer.emitWindow(collector);
- preReducer.evict(2);
- preReducer.store(11);
- preReducer.store(12);
- preReducer.emitWindow(collector);
- preReducer.store(13);
-
- List<StreamWindow<Integer>> expected = new ArrayList<StreamWindow<Integer>>();
- expected.add(StreamWindow.fromElements(1, 2));
- expected.add(StreamWindow.fromElements(4, 6));
- expected.add(StreamWindow.fromElements(12, 8));
- expected.add(StreamWindow.fromElements(18, 12));
- expected.add(StreamWindow.fromElements(24, 16));
- expected.add(StreamWindow.fromElements(30, 20));
-
- checkResults(expected, collector.getCollected());
- }
-
- @Test
- public void testPreReduce3() throws Exception {
- TestOutput<StreamWindow<Integer>> collector = new TestOutput<StreamWindow<Integer>>();
-
- SlidingCountGroupedPreReducer<Integer> preReducer = new SlidingCountGroupedPreReducer<Integer>(
- reducer, serializer, key, 6, 3, 0);
-
- preReducer.store(1);
- preReducer.store(2);
- preReducer.store(3);
- preReducer.emitWindow(collector);
- preReducer.store(4);
- preReducer.store(5);
- preReducer.store(6);
- preReducer.emitWindow(collector);
- preReducer.evict(3);
- preReducer.store(7);
- preReducer.store(8);
- preReducer.store(9);
- preReducer.emitWindow(collector);
- preReducer.evict(3);
- preReducer.store(10);
- preReducer.store(11);
- preReducer.store(12);
- preReducer.emitWindow(collector);
- preReducer.evict(3);
- preReducer.store(13);
-
- List<StreamWindow<Integer>> expected = new ArrayList<StreamWindow<Integer>>();
- expected.add(StreamWindow.fromElements(2, 4));
- expected.add(StreamWindow.fromElements(9, 12));
- expected.add(StreamWindow.fromElements(21, 18));
- expected.add(StreamWindow.fromElements(30, 27));
-
- checkResults(expected, collector.getCollected());
- }
-
- @Test
- public void testPreReduce4() throws Exception {
- TestOutput<StreamWindow<Integer>> collector = new TestOutput<StreamWindow<Integer>>();
-
- SlidingCountGroupedPreReducer<Integer> preReducer = new SlidingCountGroupedPreReducer<Integer>(
- reducer, serializer, key, 5, 1, 2);
-
- preReducer.store(1);
- preReducer.evict(1);
- preReducer.store(1);
- preReducer.evict(1);
- preReducer.store(1);
- preReducer.emitWindow(collector);
- preReducer.store(2);
- preReducer.emitWindow(collector);
- preReducer.store(3);
- preReducer.emitWindow(collector);
- preReducer.store(4);
- preReducer.emitWindow(collector);
- preReducer.store(5);
- preReducer.emitWindow(collector);
- preReducer.evict(1);
- preReducer.store(6);
- preReducer.emitWindow(collector);
- preReducer.evict(1);
- preReducer.store(7);
- preReducer.emitWindow(collector);
- preReducer.evict(1);
- preReducer.store(8);
- preReducer.emitWindow(collector);
- preReducer.evict(1);
-
- List<StreamWindow<Integer>> expected = new ArrayList<StreamWindow<Integer>>();
- expected.add(StreamWindow.fromElements(1));
- expected.add(StreamWindow.fromElements(1, 2));
- expected.add(StreamWindow.fromElements(4, 2));
- expected.add(StreamWindow.fromElements(4, 6));
- expected.add(StreamWindow.fromElements(9, 6));
- expected.add(StreamWindow.fromElements(8, 12));
- expected.add(StreamWindow.fromElements(15, 10));
- expected.add(StreamWindow.fromElements(12, 18));
-
- checkResults(expected, collector.getCollected());
- }
-
- private static class SumReducer implements ReduceFunction<Integer> {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public Integer reduce(Integer value1, Integer value2) throws Exception {
- return value1 + value2;
- }
-
- }
-
-
- protected static void checkResults(List<StreamWindow<Integer>> expected,
- List<StreamWindow<Integer>> actual) {
-
- for (StreamWindow<Integer> sw : expected) {
- Collections.sort(sw);
- }
-
- for (StreamWindow<Integer> sw : actual) {
- Collections.sort(sw);
- }
-
- assertEquals(expected, actual);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountPreReducerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountPreReducerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountPreReducerTest.java
deleted file mode 100644
index 156b875..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountPreReducerTest.java
+++ /dev/null
@@ -1,216 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.windowbuffer;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest.TestOutput;
-import org.junit.Test;
-
-public class SlidingCountPreReducerTest {
-
- TypeSerializer<Integer> serializer = TypeExtractor.getForObject(1).createSerializer(null);
-
- ReduceFunction<Integer> reducer = new SumReducer();
-
- @Test
- public void testPreReduce1() throws Exception {
- TestOutput<StreamWindow<Integer>> collector = new TestOutput<StreamWindow<Integer>>();
-
- SlidingCountPreReducer<Integer> preReducer = new SlidingCountPreReducer<Integer>(reducer,
- serializer, 3, 2, 0);
-
- preReducer.store(1);
- preReducer.store(2);
- preReducer.emitWindow(collector);
- preReducer.store(3);
- preReducer.store(4);
- preReducer.evict(1);
- preReducer.emitWindow(collector);
- preReducer.evict(2);
- preReducer.store(5);
- preReducer.store(6);
- preReducer.emitWindow(collector);
- preReducer.evict(2);
- preReducer.store(7);
- preReducer.store(8);
- preReducer.emitWindow(collector);
- preReducer.evict(2);
- preReducer.store(9);
- preReducer.store(10);
- preReducer.emitWindow(collector);
- preReducer.evict(2);
- preReducer.store(11);
- preReducer.store(12);
- preReducer.emitWindow(collector);
- preReducer.store(13);
-
- List<StreamWindow<Integer>> expected = new ArrayList<StreamWindow<Integer>>();
- expected.add(StreamWindow.fromElements(3));
- expected.add(StreamWindow.fromElements(9));
- expected.add(StreamWindow.fromElements(15));
- expected.add(StreamWindow.fromElements(21));
- expected.add(StreamWindow.fromElements(27));
- expected.add(StreamWindow.fromElements(33));
-
- assertEquals(expected, collector.getCollected());
- }
-
- @Test
- public void testPreReduce2() throws Exception {
- TestOutput<StreamWindow<Integer>> collector = new TestOutput<StreamWindow<Integer>>();
-
- SlidingCountPreReducer<Integer> preReducer = new SlidingCountPreReducer<Integer>(reducer,
- serializer, 5, 2, 0);
-
- preReducer.store(1);
- preReducer.store(2);
- preReducer.emitWindow(collector);
- preReducer.store(3);
- preReducer.store(4);
- preReducer.emitWindow(collector);
- preReducer.store(5);
- preReducer.store(6);
- preReducer.evict(1);
- preReducer.emitWindow(collector);
- preReducer.evict(2);
- preReducer.store(7);
- preReducer.store(8);
- preReducer.emitWindow(collector);
- preReducer.evict(2);
- preReducer.store(9);
- preReducer.store(10);
- preReducer.emitWindow(collector);
- preReducer.evict(2);
- preReducer.store(11);
- preReducer.store(12);
- preReducer.emitWindow(collector);
- preReducer.store(13);
-
- List<StreamWindow<Integer>> expected = new ArrayList<StreamWindow<Integer>>();
- expected.add(StreamWindow.fromElements(3));
- expected.add(StreamWindow.fromElements(10));
- expected.add(StreamWindow.fromElements(20));
- expected.add(StreamWindow.fromElements(30));
- expected.add(StreamWindow.fromElements(40));
- expected.add(StreamWindow.fromElements(50));
-
- assertEquals(expected, collector.getCollected());
- }
-
- @Test
- public void testPreReduce3() throws Exception {
- TestOutput<StreamWindow<Integer>> collector = new TestOutput<StreamWindow<Integer>>();
-
- SlidingCountPreReducer<Integer> preReducer = new SlidingCountPreReducer<Integer>(reducer,
- serializer, 6, 3, 0);
-
- preReducer.store(1);
- preReducer.store(2);
- preReducer.store(3);
- preReducer.emitWindow(collector);
- preReducer.store(4);
- preReducer.store(5);
- preReducer.store(6);
- preReducer.emitWindow(collector);
- preReducer.evict(3);
- preReducer.store(7);
- preReducer.store(8);
- preReducer.store(9);
- preReducer.emitWindow(collector);
- preReducer.evict(3);
- preReducer.store(10);
- preReducer.store(11);
- preReducer.store(12);
- preReducer.emitWindow(collector);
- preReducer.evict(3);
- preReducer.store(13);
-
- List<StreamWindow<Integer>> expected = new ArrayList<StreamWindow<Integer>>();
- expected.add(StreamWindow.fromElements(6));
- expected.add(StreamWindow.fromElements(21));
- expected.add(StreamWindow.fromElements(39));
- expected.add(StreamWindow.fromElements(57));
-
- assertEquals(expected, collector.getCollected());
- }
-
- @Test
- public void testPreReduce4() throws Exception {
- TestOutput<StreamWindow<Integer>> collector = new TestOutput<StreamWindow<Integer>>();
-
- SlidingCountPreReducer<Integer> preReducer = new SlidingCountPreReducer<Integer>(reducer,
- serializer, 5, 1, 2);
-
- preReducer.store(1);
- preReducer.evict(1);
- preReducer.store(1);
- preReducer.evict(1);
- preReducer.store(1);
- preReducer.emitWindow(collector);
- preReducer.store(2);
- preReducer.emitWindow(collector);
- preReducer.store(3);
- preReducer.emitWindow(collector);
- preReducer.store(4);
- preReducer.emitWindow(collector);
- preReducer.store(5);
- preReducer.emitWindow(collector);
- preReducer.evict(1);
- preReducer.store(6);
- preReducer.emitWindow(collector);
- preReducer.evict(1);
- preReducer.store(7);
- preReducer.emitWindow(collector);
- preReducer.evict(1);
- preReducer.store(8);
- preReducer.emitWindow(collector);
- preReducer.evict(1);
-
- List<StreamWindow<Integer>> expected = new ArrayList<StreamWindow<Integer>>();
- expected.add(StreamWindow.fromElements(1));
- expected.add(StreamWindow.fromElements(3));
- expected.add(StreamWindow.fromElements(6));
- expected.add(StreamWindow.fromElements(10));
- expected.add(StreamWindow.fromElements(15));
- expected.add(StreamWindow.fromElements(20));
- expected.add(StreamWindow.fromElements(25));
- expected.add(StreamWindow.fromElements(30));
-
- assertEquals(expected, collector.getCollected());
- }
-
- private static class SumReducer implements ReduceFunction<Integer> {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public Integer reduce(Integer value1, Integer value2) throws Exception {
- return value1 + value2;
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimeGroupedPreReducerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimeGroupedPreReducerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimeGroupedPreReducerTest.java
deleted file mode 100644
index 68bceda..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimeGroupedPreReducerTest.java
+++ /dev/null
@@ -1,387 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.windowbuffer;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.api.java.typeutils.TypeInfoParser;
-import org.apache.flink.streaming.api.operators.windowing.WindowingITCase;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.streaming.api.windowing.helper.Timestamp;
-import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
-import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest.TestOutput;
-import org.junit.Test;
-
-public class SlidingTimeGroupedPreReducerTest {
-
- TypeSerializer<Integer> serializer = TypeExtractor.getForObject(1).createSerializer(null);
- TypeInformation<Tuple2<Integer,Integer>> tupleType = TypeInfoParser.parse("Tuple2<Integer,Integer>");
-
-
- ReduceFunction<Integer> reducer = new SumReducer();
- ReduceFunction<Tuple2<Integer, Integer>> tupleReducer = new TupleSumReducer();
-
-
- KeySelector<Integer, ?> key = new WindowingITCase.ModKey(2);
- KeySelector<Tuple2<Integer, Integer>, ?> tupleKey = new TupleModKey(2);
-
- @Test
- @SuppressWarnings("unchecked")
- public void testPreReduce1() throws Exception {
- // This ensures that the buffer is properly cleared after a burst of elements by
- // replaying the same sequence of elements with a later timestamp and expecting the same
- // result.
-
- TestOutput<StreamWindow<Tuple2<Integer, Integer>>> collector = new TestOutput<StreamWindow<Tuple2<Integer, Integer>>>();
-
- SlidingTimeGroupedPreReducer<Tuple2<Integer, Integer>> preReducer = new SlidingTimeGroupedPreReducer<Tuple2<Integer, Integer>>(tupleReducer,
- tupleType.createSerializer(new ExecutionConfig()), tupleKey, 3, 2, new TimestampWrapper<Tuple2<Integer, Integer>>(new Timestamp<Tuple2<Integer, Integer>>() {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public long getTimestamp(Tuple2<Integer, Integer> value) {
- return value.f0;
- }
- }, 1));
-
- int timeOffset = 0;
-
- preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 1, 1));
- preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 2, 2));
- preReducer.emitWindow(collector);
- preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 3, 3));
- preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 4, 4));
- preReducer.evict(1);
- preReducer.emitWindow(collector);
- preReducer.evict(2);
- preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 5, 5));
- preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 6, 6));
- preReducer.emitWindow(collector);
- preReducer.evict(2);
- preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 7, 7));
- preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 8, 8));
- preReducer.emitWindow(collector);
- preReducer.evict(2);
- preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 9, 9));
- preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 10, 10));
- preReducer.emitWindow(collector);
- preReducer.evict(2);
- preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 11, 11));
- preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 12, 12));
- preReducer.emitWindow(collector);
- preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 13, 13));
-
- // ensure that everything is cleared out
- preReducer.evict(100);
-
-
- timeOffset = 25; // a little while later...
-
- // Repeat the same sequence, this should produce the same result
- preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 1, 1));
- preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 2, 2));
- preReducer.emitWindow(collector);
- preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 3, 3));
- preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 4, 4));
- preReducer.evict(1);
- preReducer.emitWindow(collector);
- preReducer.evict(2);
- preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 5, 5));
- preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 6, 6));
- preReducer.emitWindow(collector);
- preReducer.evict(2);
- preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 7, 7));
- preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 8, 8));
- preReducer.emitWindow(collector);
- preReducer.evict(2);
- preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 9, 9));
- preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 10, 10));
- preReducer.emitWindow(collector);
- preReducer.evict(2);
- preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 11, 11));
- preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 12, 12));
- preReducer.emitWindow(collector);
- preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 13, 13));
-
- List<StreamWindow<Tuple2<Integer, Integer>>> expected = new ArrayList<StreamWindow<Tuple2<Integer, Integer>>>();
- timeOffset = 0; // rewind ...
- expected.add(StreamWindow.fromElements(
- new Tuple2<Integer, Integer>(timeOffset + 2, 2),
- new Tuple2<Integer, Integer>(timeOffset + 1, 1)));
- expected.add(StreamWindow.fromElements(
- new Tuple2<Integer, Integer>(timeOffset + 2, 6),
- new Tuple2<Integer, Integer>(timeOffset + 3, 3)));
- expected.add(StreamWindow.fromElements(
- new Tuple2<Integer, Integer>(timeOffset + 4, 10),
- new Tuple2<Integer, Integer>(timeOffset + 5, 5)));
- expected.add(StreamWindow.fromElements(
- new Tuple2<Integer, Integer>(timeOffset + 6, 14),
- new Tuple2<Integer, Integer>(timeOffset + 7, 7)));
- expected.add(StreamWindow.fromElements(
- new Tuple2<Integer, Integer>(timeOffset + 8, 18),
- new Tuple2<Integer, Integer>(timeOffset + 9, 9)));
- expected.add(StreamWindow.fromElements(
- new Tuple2<Integer, Integer>(timeOffset + 10, 22),
- new Tuple2<Integer, Integer>(timeOffset + 11, 11)));
-
- timeOffset = 25; // and back to the future ...
- expected.add(StreamWindow.fromElements(
- new Tuple2<Integer, Integer>(timeOffset + 2, 2),
- new Tuple2<Integer, Integer>(timeOffset + 1, 1)));
- expected.add(StreamWindow.fromElements(
- new Tuple2<Integer, Integer>(timeOffset + 2, 6),
- new Tuple2<Integer, Integer>(timeOffset + 3, 3)));
- expected.add(StreamWindow.fromElements(
- new Tuple2<Integer, Integer>(timeOffset + 4, 10),
- new Tuple2<Integer, Integer>(timeOffset + 5, 5)));
- expected.add(StreamWindow.fromElements(
- new Tuple2<Integer, Integer>(timeOffset + 6, 14),
- new Tuple2<Integer, Integer>(timeOffset + 7, 7)));
- expected.add(StreamWindow.fromElements(
- new Tuple2<Integer, Integer>(timeOffset + 8, 18),
- new Tuple2<Integer, Integer>(timeOffset + 9, 9)));
- expected.add(StreamWindow.fromElements(
- new Tuple2<Integer, Integer>(timeOffset + 10, 22),
- new Tuple2<Integer, Integer>(timeOffset + 11, 11)));
-
- assertEquals(expected, collector.getCollected());
- }
-
- protected static void checkResults(List<StreamWindow<Integer>> expected,
- List<StreamWindow<Integer>> actual) {
-
- for (StreamWindow<Integer> sw : expected) {
- Collections.sort(sw);
- }
-
- for (StreamWindow<Integer> sw : actual) {
- Collections.sort(sw);
- }
-
- assertEquals(expected, actual);
- }
-
- @Test
- public void testPreReduce2() throws Exception {
- TestOutput<StreamWindow<Integer>> collector = new TestOutput<StreamWindow<Integer>>();
-
- SlidingTimeGroupedPreReducer<Integer> preReducer = new SlidingTimeGroupedPreReducer<Integer>(
- reducer, serializer, key, 5, 2, new TimestampWrapper<Integer>(
- new Timestamp<Integer>() {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public long getTimestamp(Integer value) {
- return value;
- }
- }, 1));
-
- preReducer.store(1);
- preReducer.store(2);
- preReducer.emitWindow(collector);
- preReducer.store(3);
- preReducer.store(4);
- preReducer.emitWindow(collector);
- preReducer.store(5);
- preReducer.store(6);
- preReducer.evict(1);
- preReducer.emitWindow(collector);
- preReducer.evict(2);
- preReducer.store(7);
- preReducer.store(8);
- preReducer.emitWindow(collector);
- preReducer.evict(2);
- preReducer.store(9);
- preReducer.store(10);
- preReducer.emitWindow(collector);
- preReducer.evict(2);
- preReducer.store(11);
- preReducer.store(12);
- preReducer.emitWindow(collector);
- preReducer.store(13);
-
- List<StreamWindow<Integer>> expected = new ArrayList<StreamWindow<Integer>>();
- expected.add(StreamWindow.fromElements(1, 2));
- expected.add(StreamWindow.fromElements(4, 6));
- expected.add(StreamWindow.fromElements(12, 8));
- expected.add(StreamWindow.fromElements(18, 12));
- expected.add(StreamWindow.fromElements(24, 16));
- expected.add(StreamWindow.fromElements(30, 20));
-
- checkResults(expected, collector.getCollected());
- }
-
- @Test
- public void testPreReduce3() throws Exception {
- TestOutput<StreamWindow<Integer>> collector = new TestOutput<StreamWindow<Integer>>();
-
- SlidingTimeGroupedPreReducer<Integer> preReducer = new SlidingTimeGroupedPreReducer<Integer>(
- reducer, serializer, key, 6, 3, new TimestampWrapper<Integer>(
- new Timestamp<Integer>() {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public long getTimestamp(Integer value) {
- return value;
- }
- }, 1));
-
- preReducer.store(1);
- preReducer.store(2);
- preReducer.store(3);
- preReducer.emitWindow(collector);
- preReducer.store(4);
- preReducer.store(5);
- preReducer.store(6);
- preReducer.emitWindow(collector);
- preReducer.evict(3);
- preReducer.store(7);
- preReducer.store(8);
- preReducer.store(9);
- preReducer.emitWindow(collector);
- preReducer.evict(3);
- preReducer.store(10);
- preReducer.store(11);
- preReducer.store(12);
- preReducer.emitWindow(collector);
- preReducer.evict(3);
- preReducer.store(13);
-
- List<StreamWindow<Integer>> expected = new ArrayList<StreamWindow<Integer>>();
- expected.add(StreamWindow.fromElements(2, 4));
- expected.add(StreamWindow.fromElements(9, 12));
- expected.add(StreamWindow.fromElements(21, 18));
- expected.add(StreamWindow.fromElements(30, 27));
-
- checkResults(expected, collector.getCollected());
- }
-
- @Test
- public void testPreReduce4() throws Exception {
- TestOutput<StreamWindow<Integer>> collector = new TestOutput<StreamWindow<Integer>>();
-
- SlidingTimeGroupedPreReducer<Integer> preReducer = new SlidingTimeGroupedPreReducer<Integer>(
- reducer, serializer, key, 3, 2, new TimestampWrapper<Integer>(
- new Timestamp<Integer>() {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public long getTimestamp(Integer value) {
- return value;
- }
- }, 1));
-
- preReducer.store(1);
- preReducer.store(2);
- preReducer.emitWindow(collector);
- preReducer.store(3);
- preReducer.store(4);
- preReducer.evict(1);
- preReducer.emitWindow(collector);
- preReducer.evict(2);
- preReducer.store(5);
- preReducer.store(6);
- preReducer.emitWindow(collector);
- preReducer.evict(2);
- preReducer.store(7);
- preReducer.store(8);
- preReducer.emitWindow(collector);
- preReducer.evict(2);
- preReducer.emitWindow(collector);
- preReducer.emitWindow(collector);
- preReducer.evict(2);
- preReducer.store(14);
- preReducer.emitWindow(collector);
- preReducer.emitWindow(collector);
- preReducer.evict(1);
- preReducer.emitWindow(collector);
- preReducer.emitWindow(collector);
- preReducer.store(21);
- preReducer.emitWindow(collector);
- preReducer.evict(1);
- preReducer.emitWindow(collector);
-
- preReducer.store(9);
-
- List<StreamWindow<Integer>> expected = new ArrayList<StreamWindow<Integer>>();
- expected.add(StreamWindow.fromElements(1, 2));
- expected.add(StreamWindow.fromElements(3, 6));
- expected.add(StreamWindow.fromElements(5, 10));
- expected.add(StreamWindow.fromElements(7, 14));
- expected.add(StreamWindow.fromElements(8));
- expected.add(StreamWindow.fromElements(8));
- expected.add(StreamWindow.fromElements(14));
- expected.add(StreamWindow.fromElements(14));
- expected.add(StreamWindow.fromElements(21));
-
- checkResults(expected, collector.getCollected());
- }
-
- private static class SumReducer implements ReduceFunction<Integer> {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public Integer reduce(Integer value1, Integer value2) throws Exception {
- return value1 + value2;
- }
-
- }
-
- private static class TupleSumReducer implements ReduceFunction<Tuple2<Integer, Integer>> {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public Tuple2<Integer, Integer> reduce(Tuple2<Integer, Integer> value1, Tuple2<Integer, Integer> value2) throws Exception {
- return new Tuple2<Integer, Integer>(value1.f0, value1.f1 + value2.f1);
- }
-
- }
-
- public static class TupleModKey implements KeySelector<Tuple2<Integer, Integer>, Integer> {
-
- private static final long serialVersionUID = 1L;
-
- private int m;
-
- public TupleModKey(int m) {
- this.m = m;
- }
-
- @Override
- public Integer getKey(Tuple2<Integer, Integer> value) throws Exception {
- return value.f1 % m;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducerTest.java
deleted file mode 100644
index 6a36c57..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducerTest.java
+++ /dev/null
@@ -1,324 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.windowbuffer;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.api.java.typeutils.TypeInfoParser;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.streaming.api.windowing.helper.Timestamp;
-import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
-import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest.TestOutput;
-import org.junit.Test;
-
-public class SlidingTimePreReducerTest {
-
- TypeSerializer<Integer> serializer = TypeExtractor.getForObject(1).createSerializer(null);
- TypeInformation<Tuple2<Integer,Integer>> tupleType = TypeInfoParser.parse("Tuple2<Integer,Integer>");
-
- ReduceFunction<Integer> reducer = new SumReducer();
- ReduceFunction<Tuple2<Integer, Integer>> tupleReducer = new TupleSumReducer();
-
- @Test
- @SuppressWarnings("unchecked")
- public void testPreReduce1() throws Exception {
- // This ensures that the buffer is properly cleared after a burst of elements by
- // replaying the same sequence of elements with a later timestamp and expecting the same
- // result.
-
- TestOutput<StreamWindow<Tuple2<Integer, Integer>>> collector = new TestOutput<StreamWindow<Tuple2<Integer, Integer>>>();
-
- SlidingTimePreReducer<Tuple2<Integer, Integer>> preReducer = new SlidingTimePreReducer<Tuple2<Integer, Integer>>(tupleReducer,
- tupleType.createSerializer(new ExecutionConfig()), 3, 2, new TimestampWrapper<Tuple2<Integer, Integer>>(new Timestamp<Tuple2<Integer, Integer>>() {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public long getTimestamp(Tuple2<Integer, Integer> value) {
- return value.f0;
- }
- }, 1));
-
- int timeOffset = 0;
-
- preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 1, 1));
- preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 2, 2));
- preReducer.emitWindow(collector);
- preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 3, 3));
- preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 4, 4));
- preReducer.evict(1);
- preReducer.emitWindow(collector);
- preReducer.evict(2);
- preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 5, 5));
- preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 6, 6));
- preReducer.emitWindow(collector);
- preReducer.evict(2);
- preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 7, 7));
- preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 8, 8));
- preReducer.emitWindow(collector);
- preReducer.evict(2);
- preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 9, 9));
- preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 10, 10));
- preReducer.emitWindow(collector);
- preReducer.evict(2);
- preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 11, 11));
- preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 12, 12));
- preReducer.emitWindow(collector);
- preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 13, 13));
-
- // ensure that everything is cleared out
- preReducer.evict(100);
-
-
- timeOffset = 25; // a little while later...
-
- // Repeat the same sequence, this should produce the same result
- preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 1, 1));
- preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 2, 2));
- preReducer.emitWindow(collector);
- preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 3, 3));
- preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 4, 4));
- preReducer.evict(1);
- preReducer.emitWindow(collector);
- preReducer.evict(2);
- preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 5, 5));
- preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 6, 6));
- preReducer.emitWindow(collector);
- preReducer.evict(2);
- preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 7, 7));
- preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 8, 8));
- preReducer.emitWindow(collector);
- preReducer.evict(2);
- preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 9, 9));
- preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 10, 10));
- preReducer.emitWindow(collector);
- preReducer.evict(2);
- preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 11, 11));
- preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 12, 12));
- preReducer.emitWindow(collector);
- preReducer.store(new Tuple2<Integer, Integer>(timeOffset + 13, 13));
-
- List<StreamWindow<Tuple2<Integer, Integer>>> expected = new ArrayList<StreamWindow<Tuple2<Integer, Integer>>>();
- timeOffset = 0; // rewind ...
- expected.add(StreamWindow.fromElements(new Tuple2<Integer, Integer>(timeOffset + 1, 3)));
- expected.add(StreamWindow.fromElements(new Tuple2<Integer, Integer>(timeOffset + 2, 9)));
- expected.add(StreamWindow.fromElements(new Tuple2<Integer, Integer>(timeOffset + 4, 15)));
- expected.add(StreamWindow.fromElements(new Tuple2<Integer, Integer>(timeOffset + 6, 21)));
- expected.add(StreamWindow.fromElements(new Tuple2<Integer, Integer>(timeOffset + 8, 27)));
- expected.add(StreamWindow.fromElements(new Tuple2<Integer, Integer>(timeOffset + 10, 33)));
-
- timeOffset = 25; // and back to the future ...
- expected.add(StreamWindow.fromElements(new Tuple2<Integer, Integer>(timeOffset + 1, 3)));
- expected.add(StreamWindow.fromElements(new Tuple2<Integer, Integer>(timeOffset + 2, 9)));
- expected.add(StreamWindow.fromElements(new Tuple2<Integer, Integer>(timeOffset + 4, 15)));
- expected.add(StreamWindow.fromElements(new Tuple2<Integer, Integer>(timeOffset + 6, 21)));
- expected.add(StreamWindow.fromElements(new Tuple2<Integer, Integer>(timeOffset + 8, 27)));
- expected.add(StreamWindow.fromElements(new Tuple2<Integer, Integer>(timeOffset + 10, 33)));
-
-
- assertEquals(expected, collector.getCollected());
- }
-
- @Test
- public void testPreReduce2() throws Exception {
- TestOutput<StreamWindow<Integer>> collector = new TestOutput<StreamWindow<Integer>>();
-
- SlidingTimePreReducer<Integer> preReducer = new SlidingTimePreReducer<Integer>(reducer,
- serializer, 5, 2, new TimestampWrapper<Integer>(new Timestamp<Integer>() {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public long getTimestamp(Integer value) {
- return value;
- }
- }, 1));
-
- preReducer.store(1);
- preReducer.store(2);
- preReducer.emitWindow(collector);
- preReducer.store(3);
- preReducer.store(4);
- preReducer.emitWindow(collector);
- preReducer.store(5);
- preReducer.store(6);
- preReducer.evict(1);
- preReducer.emitWindow(collector);
- preReducer.evict(2);
- preReducer.store(7);
- preReducer.store(8);
- preReducer.emitWindow(collector);
- preReducer.evict(2);
- preReducer.store(9);
- preReducer.store(10);
- preReducer.emitWindow(collector);
- preReducer.evict(2);
- preReducer.store(11);
- preReducer.store(12);
- preReducer.emitWindow(collector);
- preReducer.store(13);
-
- List<StreamWindow<Integer>> expected = new ArrayList<StreamWindow<Integer>>();
- expected.add(StreamWindow.fromElements(3));
- expected.add(StreamWindow.fromElements(10));
- expected.add(StreamWindow.fromElements(20));
- expected.add(StreamWindow.fromElements(30));
- expected.add(StreamWindow.fromElements(40));
- expected.add(StreamWindow.fromElements(50));
-
- assertEquals(expected, collector.getCollected());
- }
-
- @Test
- public void testPreReduce3() throws Exception {
- TestOutput<StreamWindow<Integer>> collector = new TestOutput<StreamWindow<Integer>>();
-
- SlidingTimePreReducer<Integer> preReducer = new SlidingTimePreReducer<Integer>(reducer,
- serializer, 6, 3, new TimestampWrapper<Integer>(new Timestamp<Integer>() {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public long getTimestamp(Integer value) {
- return value;
- }
- }, 1));
-
- preReducer.store(1);
- preReducer.store(2);
- preReducer.store(3);
- preReducer.emitWindow(collector);
- preReducer.store(4);
- preReducer.store(5);
- preReducer.store(6);
- preReducer.emitWindow(collector);
- preReducer.evict(3);
- preReducer.store(7);
- preReducer.store(8);
- preReducer.store(9);
- preReducer.emitWindow(collector);
- preReducer.evict(3);
- preReducer.store(10);
- preReducer.store(11);
- preReducer.store(12);
- preReducer.emitWindow(collector);
- preReducer.evict(3);
- preReducer.store(13);
-
- List<StreamWindow<Integer>> expected = new ArrayList<StreamWindow<Integer>>();
- expected.add(StreamWindow.fromElements(6));
- expected.add(StreamWindow.fromElements(21));
- expected.add(StreamWindow.fromElements(39));
- expected.add(StreamWindow.fromElements(57));
-
- assertEquals(expected, collector.getCollected());
- }
-
- @Test
- public void testPreReduce4() throws Exception {
- TestOutput<StreamWindow<Integer>> collector = new TestOutput<StreamWindow<Integer>>();
-
- SlidingTimePreReducer<Integer> preReducer = new SlidingTimePreReducer<Integer>(reducer,
- serializer, 3, 2, new TimestampWrapper<Integer>(new Timestamp<Integer>() {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public long getTimestamp(Integer value) {
- return value;
- }
- }, 1));
-
- preReducer.store(1);
- preReducer.store(2);
- preReducer.emitWindow(collector);
- preReducer.store(3);
- preReducer.store(4);
- preReducer.evict(1);
- preReducer.emitWindow(collector);
- preReducer.evict(2);
- preReducer.store(5);
- preReducer.store(6);
- preReducer.emitWindow(collector);
- preReducer.evict(2);
- preReducer.store(7);
- preReducer.store(8);
- preReducer.emitWindow(collector);
- preReducer.evict(2);
- preReducer.emitWindow(collector);
- preReducer.emitWindow(collector);
- preReducer.evict(2);
- preReducer.store(14);
- preReducer.emitWindow(collector);
- preReducer.emitWindow(collector);
- preReducer.evict(1);
- preReducer.emitWindow(collector);
- preReducer.emitWindow(collector);
- preReducer.store(21);
- preReducer.emitWindow(collector);
- preReducer.evict(1);
- preReducer.emitWindow(collector);
-
- preReducer.store(9);
-
- List<StreamWindow<Integer>> expected = new ArrayList<StreamWindow<Integer>>();
- expected.add(StreamWindow.fromElements(3));
- expected.add(StreamWindow.fromElements(9));
- expected.add(StreamWindow.fromElements(15));
- expected.add(StreamWindow.fromElements(21));
- expected.add(StreamWindow.fromElements(8));
- expected.add(StreamWindow.fromElements(8));
- expected.add(StreamWindow.fromElements(14));
- expected.add(StreamWindow.fromElements(14));
- expected.add(StreamWindow.fromElements(21));
-
- assertEquals(expected, collector.getCollected());
- }
-
- private static class SumReducer implements ReduceFunction<Integer> {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public Integer reduce(Integer value1, Integer value2) throws Exception {
- return value1 + value2;
- }
-
- }
-
- private static class TupleSumReducer implements ReduceFunction<Tuple2<Integer, Integer>> {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public Tuple2<Integer, Integer> reduce(Tuple2<Integer, Integer> value1, Tuple2<Integer, Integer> value2) throws Exception {
- return new Tuple2<Integer, Integer>(value1.f0, value1.f1 + value2.f1);
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingGroupedPreReducerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingGroupedPreReducerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingGroupedPreReducerTest.java
deleted file mode 100644
index 3aee288..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingGroupedPreReducerTest.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.windowbuffer;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.List;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.operators.Keys;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest.TestOutput;
-import org.apache.flink.streaming.util.keys.KeySelectorUtil;
-import org.junit.Test;
-
-public class TumblingGroupedPreReducerTest {
-
- TypeInformation<Tuple2<Integer, Integer>> type = TypeExtractor
- .getForObject(new Tuple2<Integer, Integer>(1, 1));
- TypeSerializer<Tuple2<Integer, Integer>> serializer = type.createSerializer(null);
-
- KeySelector<Tuple2<Integer, Integer>, ?> key = KeySelectorUtil.getSelectorForKeys(
- new Keys.ExpressionKeys<Tuple2<Integer, Integer>>(new int[] { 0 }, type), type, null);
-
- Reducer reducer = new Reducer();
-
- @SuppressWarnings("unchecked")
- @Test
- public void testEmitWindow() throws Exception {
-
- List<Tuple2<Integer, Integer>> inputs = new ArrayList<Tuple2<Integer, Integer>>();
- inputs.add(new Tuple2<Integer, Integer>(1, 1));
- inputs.add(new Tuple2<Integer, Integer>(0, 0));
- inputs.add(new Tuple2<Integer, Integer>(1, -1));
- inputs.add(new Tuple2<Integer, Integer>(1, -2));
-
- TestOutput<StreamWindow<Tuple2<Integer, Integer>>> collector = new TestOutput<StreamWindow<Tuple2<Integer, Integer>>>();
- List<StreamWindow<Tuple2<Integer, Integer>>> collected = collector.getCollected();
-
- WindowBuffer<Tuple2<Integer, Integer>> wb = new TumblingGroupedPreReducer<Tuple2<Integer, Integer>>(
- reducer, key, serializer);
-
- wb.store(serializer.copy(inputs.get(0)));
- wb.store(serializer.copy(inputs.get(1)));
- wb.emitWindow(collector);
- wb.evict(2);
-
- assertEquals(1, collected.size());
-
- assertSetEquals(StreamWindow.fromElements(new Tuple2<Integer, Integer>(1, 1),
- new Tuple2<Integer, Integer>(0, 0)), collected.get(0));
-
- wb.store(serializer.copy(inputs.get(0)));
- wb.store(serializer.copy(inputs.get(1)));
- wb.store(serializer.copy(inputs.get(2)));
-
- wb.store(serializer.copy(inputs.get(3)));
-
- wb.emitWindow(collector);
- wb.evict(4);
-
- assertEquals(2, collected.size());
-
- assertSetEquals(StreamWindow.fromElements(new Tuple2<Integer, Integer>(3, -2),
- new Tuple2<Integer, Integer>(0, 0)), collected.get(1));
-
- // Test whether function is mutating inputs or not
- assertEquals(2, reducer.allInputs.size());
- assertEquals(reducer.allInputs.get(0), inputs.get(2));
- assertEquals(reducer.allInputs.get(1), inputs.get(3));
-
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void testEmitWindow2() throws Exception {
-
- List<Tuple2<Integer, Integer>> inputs = new ArrayList<Tuple2<Integer, Integer>>();
- inputs.add(new Tuple2<Integer, Integer>(1, 1));
- inputs.add(new Tuple2<Integer, Integer>(0, 0));
- inputs.add(new Tuple2<Integer, Integer>(1, -1));
- inputs.add(new Tuple2<Integer, Integer>(1, -2));
-
- TestOutput<StreamWindow<Tuple2<Integer, Integer>>> collector = new TestOutput<StreamWindow<Tuple2<Integer, Integer>>>();
- List<StreamWindow<Tuple2<Integer, Integer>>> collected = collector.getCollected();
-
- WindowBuffer<Tuple2<Integer, Integer>> wb = new TumblingGroupedPreReducer<Tuple2<Integer, Integer>>(
- reducer, key, serializer).sequentialID();
-
- wb.store(serializer.copy(inputs.get(0)));
- wb.store(serializer.copy(inputs.get(1)));
- wb.emitWindow(collector);
- wb.evict(2);
-
- assertSetEquals(StreamWindow.fromElements(inputs.get(0), inputs.get(1)), collected.get(0));
-
- wb.store(serializer.copy(inputs.get(0)));
- wb.store(serializer.copy(inputs.get(1)));
- wb.store(serializer.copy(inputs.get(2)));
- wb.emitWindow(collector);
- wb.evict(3);
-
- assertSetEquals(StreamWindow.fromElements(new Tuple2<Integer, Integer>(2, 0), inputs.get(1)), collected.get(1));
-
-
- }
-
- private static <T> void assertSetEquals(Collection<T> first, Collection<T> second) {
- assertEquals(new HashSet<T>(first), new HashSet<T>(second));
- }
-
- @SuppressWarnings("serial")
- private class Reducer implements ReduceFunction<Tuple2<Integer, Integer>> {
-
- public List<Tuple2<Integer, Integer>> allInputs = new ArrayList<Tuple2<Integer, Integer>>();
-
- @Override
- public Tuple2<Integer, Integer> reduce(Tuple2<Integer, Integer> value1,
- Tuple2<Integer, Integer> value2) throws Exception {
- allInputs.add(value2);
- value1.f0 = value1.f0 + value2.f0;
- value1.f1 = value1.f1 + value2.f1;
- return value1;
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingPreReducerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingPreReducerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingPreReducerTest.java
deleted file mode 100644
index 3e537a5..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingPreReducerTest.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.windowing.windowbuffer;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest.TestOutput;
-import org.junit.Test;
-
-public class TumblingPreReducerTest {
-
- TypeSerializer<Tuple2<Integer, Integer>> serializer = TypeExtractor.getForObject(
- new Tuple2<Integer, Integer>(1, 1)).createSerializer(null);
-
- Reducer reducer = new Reducer();
-
- @SuppressWarnings("unchecked")
- @Test
- public void testEmitWindow() throws Exception {
-
- List<Tuple2<Integer, Integer>> inputs = new ArrayList<Tuple2<Integer, Integer>>();
- inputs.add(new Tuple2<Integer, Integer>(1, 1));
- inputs.add(new Tuple2<Integer, Integer>(2, 0));
- inputs.add(new Tuple2<Integer, Integer>(3, -1));
- inputs.add(new Tuple2<Integer, Integer>(4, -2));
-
- TestOutput<StreamWindow<Tuple2<Integer, Integer>>> collector = new TestOutput<StreamWindow<Tuple2<Integer, Integer>>>();
- List<StreamWindow<Tuple2<Integer, Integer>>> collected = collector.getCollected();
-
- WindowBuffer<Tuple2<Integer, Integer>> wb = new TumblingPreReducer<Tuple2<Integer, Integer>>(
- reducer, serializer);
-
- wb.store(serializer.copy(inputs.get(0)));
- wb.store(serializer.copy(inputs.get(1)));
-
- wb.emitWindow(collector);
- wb.evict(2);
-
- assertEquals(1, collected.size());
- assertEquals(StreamWindow.fromElements(new Tuple2<Integer, Integer>(3, 1)),
- collected.get(0));
-
- wb.store(serializer.copy(inputs.get(0)));
- wb.store(serializer.copy(inputs.get(1)));
- wb.store(serializer.copy(inputs.get(2)));
-
- wb.store(serializer.copy(inputs.get(3)));
-
- wb.emitWindow(collector);
- wb.evict(4);
-
- assertEquals(2, collected.size());
- assertEquals(StreamWindow.fromElements(new Tuple2<Integer, Integer>(10, -2)),
- collected.get(1));
-
- // Test whether function is mutating inputs or not
- assertEquals(4, reducer.allInputs.size());
- assertEquals(reducer.allInputs.get(0), inputs.get(1));
- assertEquals(reducer.allInputs.get(1), inputs.get(1));
- assertEquals(reducer.allInputs.get(2), inputs.get(2));
- assertEquals(reducer.allInputs.get(3), inputs.get(3));
-
- }
-
- @SuppressWarnings("serial")
- private class Reducer implements ReduceFunction<Tuple2<Integer, Integer>> {
-
- public List<Tuple2<Integer, Integer>> allInputs = new ArrayList<Tuple2<Integer, Integer>>();
-
- @Override
- public Tuple2<Integer, Integer> reduce(Tuple2<Integer, Integer> value1,
- Tuple2<Integer, Integer> value2) throws Exception {
- allInputs.add(value2);
- value1.f0 = value1.f0 + value2.f0;
- value1.f1 = value1.f1 + value2.f1;
- return value1;
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
index 99d45bb..4c73e44 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
@@ -17,32 +17,35 @@
package org.apache.flink.streaming.examples.ml;
+import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.WindowMapFunction;
+import org.apache.flink.streaming.api.functions.TimestampExtractor;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.api.windowing.helper.Time;
-import org.apache.flink.streaming.api.windowing.helper.Timestamp;
+import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
+import java.util.concurrent.TimeUnit;
+
/**
* Skeleton for incremental machine learning algorithm consisting of a
* pre-computed model, which gets updated for the new inputs and new input data
* for which the job provides predictions.
- * <p/>
+ *
* <p>
* This may serve as a base of a number of algorithms, e.g. updating an
* incremental Alternating Least Squares model while also providing the
* predictions.
- * </p>
- * <p/>
+ *
* <p/>
* This example shows how to use:
* <ul>
- * <li>Connected streams
- * <li>CoFunctions
- * <li>Tuple data types
+ * <li>Connected streams
+ * <li>CoFunctions
+ * <li>Tuple data types
* </ul>
*/
public class IncrementalLearningSkeleton {
@@ -61,12 +64,16 @@ public class IncrementalLearningSkeleton {
}
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
trainingData = env.addSource(new FiniteTrainingDataSource());
newData = env.addSource(new FiniteNewDataSource());
// build new model on every second of new data
- DataStream<Double[]> model = trainingData.window(Time.of(5000, new LinearTimestamp()))
- .mapWindow(new PartialModelBuilder()).flatten();
+ DataStream<Double[]> model = trainingData
+ .extractTimestamp(new LinearTimestamp())
+ .timeWindowAll(Time.of(5000, TimeUnit.MILLISECONDS))
+ .apply(new PartialModelBuilder());
// use partial model for newData
DataStream<Integer> prediction = newData.connect(model).map(new Predictor());
@@ -140,21 +147,32 @@ public class IncrementalLearningSkeleton {
}
}
- public static class LinearTimestamp implements Timestamp<Integer> {
+ public static class LinearTimestamp implements TimestampExtractor<Integer> {
private static final long serialVersionUID = 1L;
private long counter = 0L;
@Override
- public long getTimestamp(Integer value) {
+ public long extractTimestamp(Integer element, long currentTimestamp) {
return counter += 10L;
}
+
+ @Override
+ public long emitWatermark(Integer element, long currentTimestamp) {
+ return counter - 1;
+ }
+
+ @Override
+ public long getCurrentWatermark() {
+ return Long.MIN_VALUE;
+ }
+
}
/**
* Builds up-to-date partial models on new training data.
*/
- public static class PartialModelBuilder implements WindowMapFunction<Integer, Double[]> {
+ public static class PartialModelBuilder implements AllWindowFunction<Integer, Double[], TimeWindow> {
private static final long serialVersionUID = 1L;
protected Double[] buildPartialModel(Iterable<Integer> values) {
@@ -162,7 +180,7 @@ public class IncrementalLearningSkeleton {
}
@Override
- public void mapWindow(Iterable<Integer> values, Collector<Double[]> out) throws Exception {
+ public void apply(TimeWindow window, Iterable<Integer> values, Collector<Double[]> out) throws Exception {
out.collect(buildPartialModel(values));
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/util/IncrementalLearningSkeletonData.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/util/IncrementalLearningSkeletonData.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/util/IncrementalLearningSkeletonData.java
index dedc5ee..8a6cd88 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/util/IncrementalLearningSkeletonData.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/util/IncrementalLearningSkeletonData.java
@@ -19,15 +19,13 @@ package org.apache.flink.streaming.examples.ml.util;
public class IncrementalLearningSkeletonData {
- public static final String RESULTS = "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n"
- + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" +
- "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" +
- "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" +
- "0\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n"
- + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" +
- "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" +
- "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" +
- "1\n";
+ public static final String RESULTS = "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" +
+ "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "1\n" + "0\n" + "0\n" +
+ "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" +
+ "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" +
+ "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" +
+ "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" + "0\n" +
+ "0\n" + "0\n" + "0\n" + "0\n";
private IncrementalLearningSkeletonData() {
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
index 4730cc1..950b0f5 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
@@ -18,11 +18,14 @@
package org.apache.flink.streaming.examples.windowing;
import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.api.windowing.policy.CentralActiveTrigger;
-import org.apache.flink.streaming.api.windowing.policy.TumblingEvictionPolicy;
+import org.apache.flink.streaming.api.functions.source.EventTimeSourceFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import java.util.ArrayList;
import java.util.List;
@@ -37,37 +40,36 @@ public class SessionWindowing {
}
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(2);
- final List<Tuple3<String, Long, Integer>> input = new ArrayList<Tuple3<String, Long, Integer>>();
+ final List<Tuple3<String, Long, Integer>> input = new ArrayList<>();
- input.add(new Tuple3<String, Long, Integer>("a", 1L, 1));
- input.add(new Tuple3<String, Long, Integer>("b", 1L, 1));
- input.add(new Tuple3<String, Long, Integer>("b", 3L, 1));
- input.add(new Tuple3<String, Long, Integer>("b", 5L, 1));
- input.add(new Tuple3<String, Long, Integer>("c", 6L, 1));
+ input.add(new Tuple3<>("a", 1L, 1));
+ input.add(new Tuple3<>("b", 1L, 1));
+ input.add(new Tuple3<>("b", 3L, 1));
+ input.add(new Tuple3<>("b", 5L, 1));
+ input.add(new Tuple3<>("c", 6L, 1));
// We expect to detect the session "a" earlier than this point (the old
// functionality can only detect here when the next starts)
- input.add(new Tuple3<String, Long, Integer>("a", 10L, 1));
+ input.add(new Tuple3<>("a", 10L, 1));
// We expect to detect session "b" and "c" at this point as well
- input.add(new Tuple3<String, Long, Integer>("c", 11L, 1));
+ input.add(new Tuple3<>("c", 11L, 1));
DataStream<Tuple3<String, Long, Integer>> source = env
- .addSource(new SourceFunction<Tuple3<String, Long, Integer>>() {
+ .addSource(new EventTimeSourceFunction<Tuple3<String,Long,Integer>>() {
private static final long serialVersionUID = 1L;
@Override
public void run(SourceContext<Tuple3<String, Long, Integer>> ctx) throws Exception {
for (Tuple3<String, Long, Integer> value : input) {
- // We sleep three seconds between every output so we
- // can see whether we properly detect sessions
- // before the next start for a specific id
- ctx.collect(value);
+ ctx.collectWithTimestamp(value, value.f1);
+ ctx.emitWatermark(new Watermark(value.f1 - 1));
if (!fileOutput) {
System.out.println("Collected: " + value);
- Thread.sleep(3000);
}
}
+ ctx.emitWatermark(new Watermark(Long.MAX_VALUE));
}
@Override
@@ -76,10 +78,11 @@ public class SessionWindowing {
});
// We create sessions for each id with max timeout of 3 time units
- DataStream<Tuple3<String, Long, Integer>> aggregated = source.keyBy(0)
- .window(new SessionTriggerPolicy(3L),
- new TumblingEvictionPolicy<Tuple3<String, Long, Integer>>()).sum(2)
- .flatten();
+ DataStream<Tuple3<String, Long, Integer>> aggregated = source
+ .keyBy(0)
+ .window(GlobalWindows.create())
+ .trigger(new SessionTrigger(3L))
+ .sum(2);
if (fileOutput) {
aggregated.writeAsText(outputPath);
@@ -90,55 +93,46 @@ public class SessionWindowing {
env.execute();
}
- private static class SessionTriggerPolicy implements
- CentralActiveTrigger<Tuple3<String, Long, Integer>> {
+ private static class SessionTrigger implements Trigger<Tuple3<String, Long, Integer>, GlobalWindow> {
private static final long serialVersionUID = 1L;
private volatile Long lastSeenEvent = 1L;
private Long sessionTimeout;
- public SessionTriggerPolicy(Long sessionTimeout) {
+ public SessionTrigger(Long sessionTimeout) {
this.sessionTimeout = sessionTimeout;
}
@Override
- public boolean notifyTrigger(Tuple3<String, Long, Integer> datapoint) {
-
- Long eventTimestamp = datapoint.f1;
- Long timeSinceLastEvent = eventTimestamp - lastSeenEvent;
+ public TriggerResult onElement(Tuple3<String, Long, Integer> element, long timestamp, GlobalWindow window, TriggerContext ctx) {
+ Long timeSinceLastEvent = timestamp - lastSeenEvent;
// Update the last seen event time
- lastSeenEvent = eventTimestamp;
+ lastSeenEvent = timestamp;
+
+ ctx.registerWatermarkTimer(lastSeenEvent + sessionTimeout);
if (timeSinceLastEvent > sessionTimeout) {
- return true;
+ return TriggerResult.FIRE_AND_PURGE;
} else {
- return false;
+ return TriggerResult.CONTINUE;
}
}
@Override
- public Object[] notifyOnLastGlobalElement(Tuple3<String, Long, Integer> datapoint) {
- Long eventTimestamp = datapoint.f1;
- Long timeSinceLastEvent = eventTimestamp - lastSeenEvent;
-
- // Here we dont update the last seen event time because this data
- // belongs to a different group
-
- if (timeSinceLastEvent > sessionTimeout) {
- return new Object[]{datapoint};
- } else {
- return null;
+ public TriggerResult onTime(long time, TriggerContext ctx) {
+ if (time - lastSeenEvent >= sessionTimeout) {
+ return TriggerResult.FIRE_AND_PURGE;
}
+ return TriggerResult.CONTINUE;
}
@Override
- public SessionTriggerPolicy clone() {
- return new SessionTriggerPolicy(sessionTimeout);
+ public SessionTrigger duplicate() {
+ return new SessionTrigger(sessionTimeout);
}
-
}
// *************************************************************************
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java
index 55d48dd..a46ffd9 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java
@@ -19,16 +19,20 @@ package org.apache.flink.streaming.examples.windowing;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.TimestampExtractor;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.delta.DeltaFunction;
-import org.apache.flink.streaming.api.windowing.helper.Delta;
-import org.apache.flink.streaming.api.windowing.helper.Time;
-import org.apache.flink.streaming.api.windowing.helper.Timestamp;
+import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
+import org.apache.flink.streaming.api.windowing.evictors.TimeEvictor;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.triggers.DeltaTrigger;
import java.util.Arrays;
import java.util.Random;
+import java.util.concurrent.TimeUnit;
/**
* An example of grouped stream windowing where different eviction and trigger
@@ -52,29 +56,37 @@ public class TopSpeedWindowing {
}
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
@SuppressWarnings({"rawtypes", "serial"})
DataStream<Tuple4<Integer, Integer, Double, Long>> carData;
+
if (fileInput) {
carData = env.readTextFile(inputPath).map(new ParseCarData());
} else {
carData = env.addSource(CarSource.create(numOfCars));
}
- DataStream<Tuple4<Integer, Integer, Double, Long>> topSpeeds = carData.keyBy(0)
- .window(Time.of(evictionSec * 1000, new CarTimestamp()))
- .every(Delta.of(triggerMeters,
+
+ DataStream<Tuple4<Integer, Integer, Double, Long>> topSpeeds = carData
+ .extractTimestamp(new CarTimestamp())
+ .keyBy(0)
+ .window(GlobalWindows.create())
+ .evictor(TimeEvictor.of(Time.of(evictionSec, TimeUnit.SECONDS)))
+ .trigger(DeltaTrigger.of(triggerMeters,
new DeltaFunction<Tuple4<Integer, Integer, Double, Long>>() {
private static final long serialVersionUID = 1L;
-
@Override
public double getDelta(
Tuple4<Integer, Integer, Double, Long> oldDataPoint,
Tuple4<Integer, Integer, Double, Long> newDataPoint) {
return newDataPoint.f2 - oldDataPoint.f2;
}
- }, new Tuple4<Integer, Integer, Double, Long>(0, 0, 0d, 0l))).local().maxBy(1).flatten();
+ }))
+ .maxBy(1);
+
if (fileOutput) {
+ topSpeeds.print();
topSpeeds.writeAsText(outputPath);
} else {
topSpeeds.print();
@@ -143,17 +155,28 @@ public class TopSpeedWindowing {
public Tuple4<Integer, Integer, Double, Long> map(String record) {
String rawData = record.substring(1, record.length() - 1);
String[] data = rawData.split(",");
- return new Tuple4<Integer, Integer, Double, Long>(Integer.valueOf(data[0]),
- Integer.valueOf(data[1]), Double.valueOf(data[2]), Long.valueOf(data[3]));
+ return new Tuple4<>(Integer.valueOf(data[0]), Integer.valueOf(data[1]), Double.valueOf(data[2]), Long.valueOf(data[3]));
}
}
- private static class CarTimestamp implements Timestamp<Tuple4<Integer, Integer, Double, Long>> {
+ private static class CarTimestamp implements TimestampExtractor<Tuple4<Integer, Integer, Double, Long>> {
private static final long serialVersionUID = 1L;
@Override
- public long getTimestamp(Tuple4<Integer, Integer, Double, Long> value) {
- return value.f3;
+ public long extractTimestamp(Tuple4<Integer, Integer, Double, Long> element,
+ long currentTimestamp) {
+ return element.f3;
+ }
+
+ @Override
+ public long emitWatermark(Tuple4<Integer, Integer, Double, Long> element,
+ long currentTimestamp) {
+ return element.f3 - 1;
+ }
+
+ @Override
+ public long getCurrentWatermark() {
+ return Long.MIN_VALUE;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c1141ab/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/WindowWordCount.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/WindowWordCount.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/WindowWordCount.java
index 023a36a..04352d8 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/WindowWordCount.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/WindowWordCount.java
@@ -21,7 +21,9 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.examples.java.wordcount.util.WordCountData;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.windowing.helper.Count;
+import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
+import org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
+import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
import org.apache.flink.streaming.examples.wordcount.WordCount;
/**
@@ -70,11 +72,12 @@ public class WindowWordCount {
// split up the lines in pairs (2-tuples) containing: (word,1)
text.flatMap(new WordCount.Tokenizer())
// create windows of windowSize records slided every slideSize records
- .window(Count.of(windowSize)).every(Count.of(slideSize))
+ .keyBy(0)
+ .window(GlobalWindows.create())
+ .evictor(CountEvictor.of(windowSize))
+ .trigger(CountTrigger.of(slideSize))
// group by the tuple field "0" and sum up tuple field "1"
- .keyBy(0).sum(1)
- // flatten the windows to a single stream
- .flatten();
+ .sum(1);
// emit result
if (fileOutput) {