You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/07/21 12:45:13 UTC
[3/8] flink git commit: [FLINK-1967] Introduce (Event)time in
Streaming
http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowIntegrationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowIntegrationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowIntegrationTest.java
deleted file mode 100644
index ec8cda8..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowIntegrationTest.java
+++ /dev/null
@@ -1,519 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators.windowing;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.WindowMapFunction;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
-import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.streaming.api.windowing.helper.Count;
-import org.apache.flink.streaming.api.windowing.helper.FullStream;
-import org.apache.flink.streaming.api.windowing.helper.Time;
-import org.apache.flink.streaming.api.windowing.helper.Timestamp;
-import org.apache.flink.streaming.util.TestStreamEnvironment;
-import org.apache.flink.util.Collector;
-import org.junit.Test;
-
-public class WindowIntegrationTest implements Serializable {
-
- private static final long serialVersionUID = 1L;
- private static final Integer MEMORYSIZE = 32;
-
- @SuppressWarnings("serial")
- public static class ModKey implements KeySelector<Integer, Integer> {
- private int m;
-
- public ModKey(int m) {
- this.m = m;
- }
-
- @Override
- public Integer getKey(Integer value) throws Exception {
- return value % m;
- }
- }
-
- @SuppressWarnings("serial")
- public static class IdentityWindowMap implements
- WindowMapFunction<Integer, StreamWindow<Integer>> {
-
- @Override
- public void mapWindow(Iterable<Integer> values, Collector<StreamWindow<Integer>> out)
- throws Exception {
-
- StreamWindow<Integer> window = new StreamWindow<Integer>();
-
- for (Integer value : values) {
- window.add(value);
- }
- out.collect(window);
- }
-
- }
-
- @SuppressWarnings("serial")
- @Test
- public void test() throws Exception {
-
- List<Integer> inputs = new ArrayList<Integer>();
- inputs.add(1);
- inputs.add(2);
- inputs.add(2);
- inputs.add(3);
- inputs.add(4);
- inputs.add(5);
- inputs.add(10);
- inputs.add(11);
- inputs.add(11);
-
- KeySelector<Integer, ?> key = new ModKey(2);
-
- Timestamp<Integer> ts = new Timestamp<Integer>() {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public long getTimestamp(Integer value) {
- return value;
- }
- };
-
- StreamExecutionEnvironment env = new TestStreamEnvironment(2, MEMORYSIZE);
- env.disableOperatorChaining();
-
- DataStream<Integer> source = env.fromCollection(inputs);
-
- source.window(Time.of(3, ts, 1)).every(Time.of(2, ts, 1)).sum(0).getDiscretizedStream()
- .addSink(new TestSink1());
-
- source.window(Time.of(4, ts, 1)).groupBy(new ModKey(2)).mapWindow(new IdentityWindowMap())
- .flatten().addSink(new TestSink2());
-
- source.groupBy(key).window(Time.of(4, ts, 1)).sum(0).getDiscretizedStream()
- .addSink(new TestSink4());
-
- source.groupBy(new ModKey(3)).window(Count.of(2)).groupBy(new ModKey(2))
- .mapWindow(new IdentityWindowMap()).flatten().addSink(new TestSink5());
-
- source.window(Time.of(2, ts)).every(Time.of(3, ts)).min(0).getDiscretizedStream()
- .addSink(new TestSink3());
-
- source.groupBy(key).window(Time.of(4, ts, 1)).max(0).getDiscretizedStream()
- .addSink(new TestSink6());
-
- source.window(Time.of(5, ts, 1)).mapWindow(new IdentityWindowMap()).flatten()
- .addSink(new TestSink7());
-
- source.window(Time.of(5, ts, 1)).every(Time.of(4, ts, 1)).groupBy(new ModKey(2)).sum(0)
- .getDiscretizedStream().addSink(new TestSink8());
-
- try {
- source.window(FullStream.window()).every(Count.of(2)).getDiscretizedStream();
- fail();
- } catch (Exception e) {
- }
- try {
- source.window(FullStream.window()).getDiscretizedStream();
- fail();
- } catch (Exception e) {
- }
- try {
- source.every(Count.of(5)).mapWindow(new IdentityWindowMap()).getDiscretizedStream();
- fail();
- } catch (Exception e) {
- }
-
- source.every(Count.of(4)).sum(0).getDiscretizedStream().addSink(new TestSink11());
-
- source.window(FullStream.window()).every(Count.of(4)).groupBy(key).sum(0)
- .getDiscretizedStream().addSink(new TestSink12());
-
- DataStream<Integer> source2 = env.addSource(new ParallelSourceFunction<Integer>() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void run(SourceContext<Integer> ctx) throws Exception {
- for (int i = 1; i <= 10; i++) {
- ctx.collect(i);
- }
- }
-
- @Override
- public void cancel() {
- }
- });
-
- DataStream<Integer> source3 = env.addSource(new RichParallelSourceFunction<Integer>() {
- private static final long serialVersionUID = 1L;
-
- private int i = 1;
-
- @Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
- i = 1 + getRuntimeContext().getIndexOfThisSubtask();
- }
-
- @Override
- public void cancel() {
- }
-
- @Override
- public void run(SourceContext<Integer> ctx) throws Exception {
- for (;i < 11; i += 2) {
- ctx.collect(i);
- }
-
- }
- });
-
- source2.window(Time.of(2, ts, 1)).sum(0).getDiscretizedStream().addSink(new TestSink9());
-
- source3.window(Time.of(5, ts, 1)).groupBy(new ModKey(2)).sum(0).getDiscretizedStream()
- .addSink(new TestSink10());
-
- source.map(new MapFunction<Integer, Integer>() {
- @Override
- public Integer map(Integer value) throws Exception {
- return value;
- }
- }).every(Time.of(5, ts, 1)).sum(0).getDiscretizedStream().addSink(new TestSink13());
-
- env.execute();
-
- // sum ( Time of 3 slide 2 )
- List<StreamWindow<Integer>> expected1 = new ArrayList<StreamWindow<Integer>>();
- expected1.add(StreamWindow.fromElements(5));
- expected1.add(StreamWindow.fromElements(11));
- expected1.add(StreamWindow.fromElements(9));
- expected1.add(StreamWindow.fromElements(10));
- expected1.add(StreamWindow.fromElements(32));
-
- validateOutput(expected1, TestSink1.windows);
-
- // Tumbling Time of 4 grouped by mod 2
- List<StreamWindow<Integer>> expected2 = new ArrayList<StreamWindow<Integer>>();
- expected2.add(StreamWindow.fromElements(2, 2, 4));
- expected2.add(StreamWindow.fromElements(1, 3));
- expected2.add(StreamWindow.fromElements(5));
- expected2.add(StreamWindow.fromElements(10));
- expected2.add(StreamWindow.fromElements(11, 11));
-
- validateOutput(expected2, TestSink2.windows);
-
- // groupby mod 2 sum ( Tumbling Time of 4)
- List<StreamWindow<Integer>> expected3 = new ArrayList<StreamWindow<Integer>>();
- expected3.add(StreamWindow.fromElements(4));
- expected3.add(StreamWindow.fromElements(5));
- expected3.add(StreamWindow.fromElements(22));
- expected3.add(StreamWindow.fromElements(8));
- expected3.add(StreamWindow.fromElements(10));
-
- validateOutput(expected3, TestSink4.windows);
-
- // groupby mod3 Tumbling Count of 2 grouped by mod 2
- List<StreamWindow<Integer>> expected4 = new ArrayList<StreamWindow<Integer>>();
- expected4.add(StreamWindow.fromElements(2, 2));
- expected4.add(StreamWindow.fromElements(1));
- expected4.add(StreamWindow.fromElements(4));
- expected4.add(StreamWindow.fromElements(5, 11));
- expected4.add(StreamWindow.fromElements(10));
- expected4.add(StreamWindow.fromElements(11));
- expected4.add(StreamWindow.fromElements(3));
-
- validateOutput(expected4, TestSink5.windows);
-
- // min ( Time of 2 slide 3 )
- List<StreamWindow<Integer>> expected5 = new ArrayList<StreamWindow<Integer>>();
- expected5.add(StreamWindow.fromElements(1));
- expected5.add(StreamWindow.fromElements(4));
- expected5.add(StreamWindow.fromElements(10));
-
- validateOutput(expected5, TestSink3.windows);
-
- // groupby mod 2 max ( Tumbling Time of 4)
- List<StreamWindow<Integer>> expected6 = new ArrayList<StreamWindow<Integer>>();
- expected6.add(StreamWindow.fromElements(3));
- expected6.add(StreamWindow.fromElements(5));
- expected6.add(StreamWindow.fromElements(11));
- expected6.add(StreamWindow.fromElements(4));
- expected6.add(StreamWindow.fromElements(10));
-
- validateOutput(expected6, TestSink6.windows);
-
- List<StreamWindow<Integer>> expected7 = new ArrayList<StreamWindow<Integer>>();
- expected7.add(StreamWindow.fromElements(1, 2, 2, 3, 4, 5));
- expected7.add(StreamWindow.fromElements(10));
- expected7.add(StreamWindow.fromElements(10, 11, 11));
-
- validateOutput(expected7, TestSink7.windows);
-
- List<StreamWindow<Integer>> expected8 = new ArrayList<StreamWindow<Integer>>();
- expected8.add(StreamWindow.fromElements(4, 8));
- expected8.add(StreamWindow.fromElements(4, 5));
- expected8.add(StreamWindow.fromElements(10, 22));
-
- for (List<Integer> sw : TestSink8.windows) {
- Collections.sort(sw);
- }
-
- validateOutput(expected8, TestSink8.windows);
-
- List<StreamWindow<Integer>> expected9 = new ArrayList<StreamWindow<Integer>>();
- expected9.add(StreamWindow.fromElements(6));
- expected9.add(StreamWindow.fromElements(14));
- expected9.add(StreamWindow.fromElements(22));
- expected9.add(StreamWindow.fromElements(30));
- expected9.add(StreamWindow.fromElements(38));
-
- validateOutput(expected9, TestSink9.windows);
-
- List<StreamWindow<Integer>> expected10 = new ArrayList<StreamWindow<Integer>>();
- expected10.add(StreamWindow.fromElements(6, 9));
- expected10.add(StreamWindow.fromElements(16, 24));
-
- for (List<Integer> sw : TestSink10.windows) {
- Collections.sort(sw);
- }
-
- validateOutput(expected10, TestSink10.windows);
-
- List<StreamWindow<Integer>> expected11 = new ArrayList<StreamWindow<Integer>>();
- expected11.add(StreamWindow.fromElements(8));
- expected11.add(StreamWindow.fromElements(38));
- expected11.add(StreamWindow.fromElements(49));
-
- for (List<Integer> sw : TestSink11.windows) {
- Collections.sort(sw);
- }
-
- validateOutput(expected11, TestSink11.windows);
-
- List<StreamWindow<Integer>> expected12 = new ArrayList<StreamWindow<Integer>>();
- expected12.add(StreamWindow.fromElements(4, 4));
- expected12.add(StreamWindow.fromElements(18, 20));
- expected12.add(StreamWindow.fromElements(18, 31));
-
- for (List<Integer> sw : TestSink12.windows) {
- Collections.sort(sw);
- }
-
- validateOutput(expected12, TestSink12.windows);
-
- List<StreamWindow<Integer>> expected13 = new ArrayList<StreamWindow<Integer>>();
- expected13.add(StreamWindow.fromElements(17));
- expected13.add(StreamWindow.fromElements(27));
- expected13.add(StreamWindow.fromElements(49));
-
- for (List<Integer> sw : TestSink13.windows) {
- Collections.sort(sw);
- }
-
- validateOutput(expected13, TestSink13.windows);
-
- }
-
- public static <R> void validateOutput(List<R> expected, List<R> actual) {
- assertEquals(new HashSet<R>(expected), new HashSet<R>(actual));
- }
-
- @SuppressWarnings("serial")
- private static class TestSink1 implements SinkFunction<StreamWindow<Integer>> {
-
- public static List<StreamWindow<Integer>> windows = Collections
- .synchronizedList(new ArrayList<StreamWindow<Integer>>());
-
- @Override
- public void invoke(StreamWindow<Integer> value) throws Exception {
- windows.add(value);
- }
-
- }
-
- @SuppressWarnings("serial")
- private static class TestSink2 implements SinkFunction<StreamWindow<Integer>> {
-
- public static List<StreamWindow<Integer>> windows = Collections
- .synchronizedList(new ArrayList<StreamWindow<Integer>>());
-
- @Override
- public void invoke(StreamWindow<Integer> value) throws Exception {
- windows.add(value);
- }
-
- }
-
- @SuppressWarnings("serial")
- private static class TestSink3 implements SinkFunction<StreamWindow<Integer>> {
-
- public static List<StreamWindow<Integer>> windows = Collections
- .synchronizedList(new ArrayList<StreamWindow<Integer>>());
-
- @Override
- public void invoke(StreamWindow<Integer> value) throws Exception {
- windows.add(value);
- }
-
- }
-
- @SuppressWarnings("serial")
- private static class TestSink4 implements SinkFunction<StreamWindow<Integer>> {
-
- public static List<StreamWindow<Integer>> windows = Collections
- .synchronizedList(new ArrayList<StreamWindow<Integer>>());
-
- @Override
- public void invoke(StreamWindow<Integer> value) throws Exception {
- windows.add(value);
- }
-
- }
-
- @SuppressWarnings("serial")
- private static class TestSink5 implements SinkFunction<StreamWindow<Integer>> {
-
- public static List<StreamWindow<Integer>> windows = Collections
- .synchronizedList(new ArrayList<StreamWindow<Integer>>());
-
- @Override
- public void invoke(StreamWindow<Integer> value) throws Exception {
- windows.add(value);
- }
-
- }
-
- @SuppressWarnings("serial")
- private static class TestSink6 implements SinkFunction<StreamWindow<Integer>> {
-
- public static List<StreamWindow<Integer>> windows = Collections
- .synchronizedList(new ArrayList<StreamWindow<Integer>>());
-
- @Override
- public void invoke(StreamWindow<Integer> value) throws Exception {
- windows.add(value);
- }
-
- }
-
- @SuppressWarnings("serial")
- private static class TestSink7 implements SinkFunction<StreamWindow<Integer>> {
-
- public static List<StreamWindow<Integer>> windows = Collections
- .synchronizedList(new ArrayList<StreamWindow<Integer>>());
-
- @Override
- public void invoke(StreamWindow<Integer> value) throws Exception {
- windows.add(value);
- }
-
- }
-
- @SuppressWarnings("serial")
- private static class TestSink8 implements SinkFunction<StreamWindow<Integer>> {
-
- public static List<StreamWindow<Integer>> windows = Collections
- .synchronizedList(new ArrayList<StreamWindow<Integer>>());
-
- @Override
- public void invoke(StreamWindow<Integer> value) throws Exception {
- windows.add(value);
- }
-
- }
-
- @SuppressWarnings("serial")
- private static class TestSink9 implements SinkFunction<StreamWindow<Integer>> {
-
- public static List<StreamWindow<Integer>> windows = Collections
- .synchronizedList(new ArrayList<StreamWindow<Integer>>());
-
- @Override
- public void invoke(StreamWindow<Integer> value) throws Exception {
- windows.add(value);
- }
-
- }
-
- @SuppressWarnings("serial")
- private static class TestSink10 implements SinkFunction<StreamWindow<Integer>> {
-
- public static List<StreamWindow<Integer>> windows = Collections
- .synchronizedList(new ArrayList<StreamWindow<Integer>>());
-
- @Override
- public void invoke(StreamWindow<Integer> value) throws Exception {
- windows.add(value);
- }
-
- }
-
- @SuppressWarnings("serial")
- private static class TestSink11 implements SinkFunction<StreamWindow<Integer>> {
-
- public static List<StreamWindow<Integer>> windows = Collections
- .synchronizedList(new ArrayList<StreamWindow<Integer>>());
-
- @Override
- public void invoke(StreamWindow<Integer> value) throws Exception {
- windows.add(value);
- }
-
- }
-
- @SuppressWarnings("serial")
- private static class TestSink12 implements SinkFunction<StreamWindow<Integer>> {
-
- public static List<StreamWindow<Integer>> windows = Collections
- .synchronizedList(new ArrayList<StreamWindow<Integer>>());
-
- @Override
- public void invoke(StreamWindow<Integer> value) throws Exception {
- windows.add(value);
- }
-
- }
-
- @SuppressWarnings("serial")
- private static class TestSink13 implements SinkFunction<StreamWindow<Integer>> {
-
- public static List<StreamWindow<Integer>> windows = Collections
- .synchronizedList(new ArrayList<StreamWindow<Integer>>());
-
- @Override
- public void invoke(StreamWindow<Integer> value) throws Exception {
- windows.add(value);
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowingITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowingITCase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowingITCase.java
new file mode 100644
index 0000000..5e6ffa2
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowingITCase.java
@@ -0,0 +1,519 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.windowing;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.WindowMapFunction;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.windowing.StreamWindow;
+import org.apache.flink.streaming.api.windowing.helper.Count;
+import org.apache.flink.streaming.api.windowing.helper.FullStream;
+import org.apache.flink.streaming.api.windowing.helper.Time;
+import org.apache.flink.streaming.api.windowing.helper.Timestamp;
+import org.apache.flink.streaming.util.TestStreamEnvironment;
+import org.apache.flink.util.Collector;
+import org.junit.Test;
+
+public class WindowingITCase implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+ private static final Integer MEMORYSIZE = 32;
+
+ @SuppressWarnings("serial")
+ public static class ModKey implements KeySelector<Integer, Integer> {
+ private int m;
+
+ public ModKey(int m) {
+ this.m = m;
+ }
+
+ @Override
+ public Integer getKey(Integer value) throws Exception {
+ return value % m;
+ }
+ }
+
+ @SuppressWarnings("serial")
+ public static class IdentityWindowMap implements
+ WindowMapFunction<Integer, StreamWindow<Integer>> {
+
+ @Override
+ public void mapWindow(Iterable<Integer> values, Collector<StreamWindow<Integer>> out)
+ throws Exception {
+
+ StreamWindow<Integer> window = new StreamWindow<Integer>();
+
+ for (Integer value : values) {
+ window.add(value);
+ }
+ out.collect(window);
+ }
+
+ }
+
+ @SuppressWarnings("serial")
+ @Test
+ public void test() throws Exception {
+
+ List<Integer> inputs = new ArrayList<Integer>();
+ inputs.add(1);
+ inputs.add(2);
+ inputs.add(2);
+ inputs.add(3);
+ inputs.add(4);
+ inputs.add(5);
+ inputs.add(10);
+ inputs.add(11);
+ inputs.add(11);
+
+ KeySelector<Integer, ?> key = new ModKey(2);
+
+ Timestamp<Integer> ts = new Timestamp<Integer>() {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public long getTimestamp(Integer value) {
+ return value;
+ }
+ };
+
+ StreamExecutionEnvironment env = new TestStreamEnvironment(2, MEMORYSIZE);
+ env.disableOperatorChaining();
+
+ DataStream<Integer> source = env.fromCollection(inputs);
+
+ source.window(Time.of(3, ts, 1)).every(Time.of(2, ts, 1)).sum(0).getDiscretizedStream()
+ .addSink(new TestSink1());
+
+ source.window(Time.of(4, ts, 1)).groupBy(new ModKey(2)).mapWindow(new IdentityWindowMap())
+ .flatten().addSink(new TestSink2());
+
+ source.groupBy(key).window(Time.of(4, ts, 1)).sum(0).getDiscretizedStream()
+ .addSink(new TestSink4());
+
+ source.groupBy(new ModKey(3)).window(Count.of(2)).groupBy(new ModKey(2))
+ .mapWindow(new IdentityWindowMap()).flatten().addSink(new TestSink5());
+
+ source.window(Time.of(2, ts)).every(Time.of(3, ts)).min(0).getDiscretizedStream()
+ .addSink(new TestSink3());
+
+ source.groupBy(key).window(Time.of(4, ts, 1)).max(0).getDiscretizedStream()
+ .addSink(new TestSink6());
+
+ source.window(Time.of(5, ts, 1)).mapWindow(new IdentityWindowMap()).flatten()
+ .addSink(new TestSink7());
+
+ source.window(Time.of(5, ts, 1)).every(Time.of(4, ts, 1)).groupBy(new ModKey(2)).sum(0)
+ .getDiscretizedStream().addSink(new TestSink8());
+
+ try {
+ source.window(FullStream.window()).every(Count.of(2)).getDiscretizedStream();
+ fail();
+ } catch (Exception e) {
+ }
+ try {
+ source.window(FullStream.window()).getDiscretizedStream();
+ fail();
+ } catch (Exception e) {
+ }
+ try {
+ source.every(Count.of(5)).mapWindow(new IdentityWindowMap()).getDiscretizedStream();
+ fail();
+ } catch (Exception e) {
+ }
+
+ source.every(Count.of(4)).sum(0).getDiscretizedStream().addSink(new TestSink11());
+
+ source.window(FullStream.window()).every(Count.of(4)).groupBy(key).sum(0)
+ .getDiscretizedStream().addSink(new TestSink12());
+
+ DataStream<Integer> source2 = env.addSource(new ParallelSourceFunction<Integer>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void run(SourceContext<Integer> ctx) throws Exception {
+ for (int i = 1; i <= 10; i++) {
+ ctx.collect(i);
+ }
+ }
+
+ @Override
+ public void cancel() {
+ }
+ });
+
+ DataStream<Integer> source3 = env.addSource(new RichParallelSourceFunction<Integer>() {
+ private static final long serialVersionUID = 1L;
+
+ private int i = 1;
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ i = 1 + getRuntimeContext().getIndexOfThisSubtask();
+ }
+
+ @Override
+ public void cancel() {
+ }
+
+ @Override
+ public void run(SourceContext<Integer> ctx) throws Exception {
+ for (;i < 11; i += 2) {
+ ctx.collect(i);
+ }
+
+ }
+ });
+
+ source2.window(Time.of(2, ts, 1)).sum(0).getDiscretizedStream().addSink(new TestSink9());
+
+ source3.window(Time.of(5, ts, 1)).groupBy(new ModKey(2)).sum(0).getDiscretizedStream()
+ .addSink(new TestSink10());
+
+ source.map(new MapFunction<Integer, Integer>() {
+ @Override
+ public Integer map(Integer value) throws Exception {
+ return value;
+ }
+ }).every(Time.of(5, ts, 1)).sum(0).getDiscretizedStream().addSink(new TestSink13());
+
+ env.execute();
+
+ // sum ( Time of 3 slide 2 )
+ List<StreamWindow<Integer>> expected1 = new ArrayList<StreamWindow<Integer>>();
+ expected1.add(StreamWindow.fromElements(5));
+ expected1.add(StreamWindow.fromElements(11));
+ expected1.add(StreamWindow.fromElements(9));
+ expected1.add(StreamWindow.fromElements(10));
+ expected1.add(StreamWindow.fromElements(32));
+
+ validateOutput(expected1, TestSink1.windows);
+
+ // Tumbling Time of 4 grouped by mod 2
+ List<StreamWindow<Integer>> expected2 = new ArrayList<StreamWindow<Integer>>();
+ expected2.add(StreamWindow.fromElements(2, 2, 4));
+ expected2.add(StreamWindow.fromElements(1, 3));
+ expected2.add(StreamWindow.fromElements(5));
+ expected2.add(StreamWindow.fromElements(10));
+ expected2.add(StreamWindow.fromElements(11, 11));
+
+ validateOutput(expected2, TestSink2.windows);
+
+ // groupby mod 2 sum ( Tumbling Time of 4)
+ List<StreamWindow<Integer>> expected3 = new ArrayList<StreamWindow<Integer>>();
+ expected3.add(StreamWindow.fromElements(4));
+ expected3.add(StreamWindow.fromElements(5));
+ expected3.add(StreamWindow.fromElements(22));
+ expected3.add(StreamWindow.fromElements(8));
+ expected3.add(StreamWindow.fromElements(10));
+
+ validateOutput(expected3, TestSink4.windows);
+
+ // groupby mod3 Tumbling Count of 2 grouped by mod 2
+ List<StreamWindow<Integer>> expected4 = new ArrayList<StreamWindow<Integer>>();
+ expected4.add(StreamWindow.fromElements(2, 2));
+ expected4.add(StreamWindow.fromElements(1));
+ expected4.add(StreamWindow.fromElements(4));
+ expected4.add(StreamWindow.fromElements(5, 11));
+ expected4.add(StreamWindow.fromElements(10));
+ expected4.add(StreamWindow.fromElements(11));
+ expected4.add(StreamWindow.fromElements(3));
+
+ validateOutput(expected4, TestSink5.windows);
+
+ // min ( Time of 2 slide 3 )
+ List<StreamWindow<Integer>> expected5 = new ArrayList<StreamWindow<Integer>>();
+ expected5.add(StreamWindow.fromElements(1));
+ expected5.add(StreamWindow.fromElements(4));
+ expected5.add(StreamWindow.fromElements(10));
+
+ validateOutput(expected5, TestSink3.windows);
+
+ // groupby mod 2 max ( Tumbling Time of 4)
+ List<StreamWindow<Integer>> expected6 = new ArrayList<StreamWindow<Integer>>();
+ expected6.add(StreamWindow.fromElements(3));
+ expected6.add(StreamWindow.fromElements(5));
+ expected6.add(StreamWindow.fromElements(11));
+ expected6.add(StreamWindow.fromElements(4));
+ expected6.add(StreamWindow.fromElements(10));
+
+ validateOutput(expected6, TestSink6.windows);
+
+ List<StreamWindow<Integer>> expected7 = new ArrayList<StreamWindow<Integer>>();
+ expected7.add(StreamWindow.fromElements(1, 2, 2, 3, 4, 5));
+ expected7.add(StreamWindow.fromElements(10));
+ expected7.add(StreamWindow.fromElements(10, 11, 11));
+
+ validateOutput(expected7, TestSink7.windows);
+
+ List<StreamWindow<Integer>> expected8 = new ArrayList<StreamWindow<Integer>>();
+ expected8.add(StreamWindow.fromElements(4, 8));
+ expected8.add(StreamWindow.fromElements(4, 5));
+ expected8.add(StreamWindow.fromElements(10, 22));
+
+ for (List<Integer> sw : TestSink8.windows) {
+ Collections.sort(sw);
+ }
+
+ validateOutput(expected8, TestSink8.windows);
+
+ List<StreamWindow<Integer>> expected9 = new ArrayList<StreamWindow<Integer>>();
+ expected9.add(StreamWindow.fromElements(6));
+ expected9.add(StreamWindow.fromElements(14));
+ expected9.add(StreamWindow.fromElements(22));
+ expected9.add(StreamWindow.fromElements(30));
+ expected9.add(StreamWindow.fromElements(38));
+
+ validateOutput(expected9, TestSink9.windows);
+
+ List<StreamWindow<Integer>> expected10 = new ArrayList<StreamWindow<Integer>>();
+ expected10.add(StreamWindow.fromElements(6, 9));
+ expected10.add(StreamWindow.fromElements(16, 24));
+
+ for (List<Integer> sw : TestSink10.windows) {
+ Collections.sort(sw);
+ }
+
+ validateOutput(expected10, TestSink10.windows);
+
+ List<StreamWindow<Integer>> expected11 = new ArrayList<StreamWindow<Integer>>();
+ expected11.add(StreamWindow.fromElements(8));
+ expected11.add(StreamWindow.fromElements(38));
+ expected11.add(StreamWindow.fromElements(49));
+
+ for (List<Integer> sw : TestSink11.windows) {
+ Collections.sort(sw);
+ }
+
+ validateOutput(expected11, TestSink11.windows);
+
+ List<StreamWindow<Integer>> expected12 = new ArrayList<StreamWindow<Integer>>();
+ expected12.add(StreamWindow.fromElements(4, 4));
+ expected12.add(StreamWindow.fromElements(18, 20));
+ expected12.add(StreamWindow.fromElements(18, 31));
+
+ for (List<Integer> sw : TestSink12.windows) {
+ Collections.sort(sw);
+ }
+
+ validateOutput(expected12, TestSink12.windows);
+
+ List<StreamWindow<Integer>> expected13 = new ArrayList<StreamWindow<Integer>>();
+ expected13.add(StreamWindow.fromElements(17));
+ expected13.add(StreamWindow.fromElements(27));
+ expected13.add(StreamWindow.fromElements(49));
+
+ for (List<Integer> sw : TestSink13.windows) {
+ Collections.sort(sw);
+ }
+
+ validateOutput(expected13, TestSink13.windows);
+
+ }
+
+ public static <R> void validateOutput(List<R> expected, List<R> actual) {
+ assertEquals(new HashSet<R>(expected), new HashSet<R>(actual));
+ }
+
+ @SuppressWarnings("serial")
+ private static class TestSink1 implements SinkFunction<StreamWindow<Integer>> {
+
+ public static List<StreamWindow<Integer>> windows = Collections
+ .synchronizedList(new ArrayList<StreamWindow<Integer>>());
+
+ @Override
+ public void invoke(StreamWindow<Integer> value) throws Exception {
+ windows.add(value);
+ }
+
+ }
+
+ @SuppressWarnings("serial")
+ private static class TestSink2 implements SinkFunction<StreamWindow<Integer>> {
+
+ public static List<StreamWindow<Integer>> windows = Collections
+ .synchronizedList(new ArrayList<StreamWindow<Integer>>());
+
+ @Override
+ public void invoke(StreamWindow<Integer> value) throws Exception {
+ windows.add(value);
+ }
+
+ }
+
+ @SuppressWarnings("serial")
+ private static class TestSink3 implements SinkFunction<StreamWindow<Integer>> {
+
+ public static List<StreamWindow<Integer>> windows = Collections
+ .synchronizedList(new ArrayList<StreamWindow<Integer>>());
+
+ @Override
+ public void invoke(StreamWindow<Integer> value) throws Exception {
+ windows.add(value);
+ }
+
+ }
+
+ @SuppressWarnings("serial")
+ private static class TestSink4 implements SinkFunction<StreamWindow<Integer>> {
+
+ public static List<StreamWindow<Integer>> windows = Collections
+ .synchronizedList(new ArrayList<StreamWindow<Integer>>());
+
+ @Override
+ public void invoke(StreamWindow<Integer> value) throws Exception {
+ windows.add(value);
+ }
+
+ }
+
+ @SuppressWarnings("serial")
+ private static class TestSink5 implements SinkFunction<StreamWindow<Integer>> {
+
+ public static List<StreamWindow<Integer>> windows = Collections
+ .synchronizedList(new ArrayList<StreamWindow<Integer>>());
+
+ @Override
+ public void invoke(StreamWindow<Integer> value) throws Exception {
+ windows.add(value);
+ }
+
+ }
+
+ @SuppressWarnings("serial")
+ private static class TestSink6 implements SinkFunction<StreamWindow<Integer>> {
+
+ public static List<StreamWindow<Integer>> windows = Collections
+ .synchronizedList(new ArrayList<StreamWindow<Integer>>());
+
+ @Override
+ public void invoke(StreamWindow<Integer> value) throws Exception {
+ windows.add(value);
+ }
+
+ }
+
+ @SuppressWarnings("serial")
+ private static class TestSink7 implements SinkFunction<StreamWindow<Integer>> {
+
+ public static List<StreamWindow<Integer>> windows = Collections
+ .synchronizedList(new ArrayList<StreamWindow<Integer>>());
+
+ @Override
+ public void invoke(StreamWindow<Integer> value) throws Exception {
+ windows.add(value);
+ }
+
+ }
+
+ @SuppressWarnings("serial")
+ private static class TestSink8 implements SinkFunction<StreamWindow<Integer>> {
+
+ public static List<StreamWindow<Integer>> windows = Collections
+ .synchronizedList(new ArrayList<StreamWindow<Integer>>());
+
+ @Override
+ public void invoke(StreamWindow<Integer> value) throws Exception {
+ windows.add(value);
+ }
+
+ }
+
+ @SuppressWarnings("serial")
+ private static class TestSink9 implements SinkFunction<StreamWindow<Integer>> {
+
+ public static List<StreamWindow<Integer>> windows = Collections
+ .synchronizedList(new ArrayList<StreamWindow<Integer>>());
+
+ @Override
+ public void invoke(StreamWindow<Integer> value) throws Exception {
+ windows.add(value);
+ }
+
+ }
+
+ @SuppressWarnings("serial")
+ private static class TestSink10 implements SinkFunction<StreamWindow<Integer>> {
+
+ public static List<StreamWindow<Integer>> windows = Collections
+ .synchronizedList(new ArrayList<StreamWindow<Integer>>());
+
+ @Override
+ public void invoke(StreamWindow<Integer> value) throws Exception {
+ windows.add(value);
+ }
+
+ }
+
+ @SuppressWarnings("serial")
+ private static class TestSink11 implements SinkFunction<StreamWindow<Integer>> {
+
+ public static List<StreamWindow<Integer>> windows = Collections
+ .synchronizedList(new ArrayList<StreamWindow<Integer>>());
+
+ @Override
+ public void invoke(StreamWindow<Integer> value) throws Exception {
+ windows.add(value);
+ }
+
+ }
+
+ @SuppressWarnings("serial")
+ private static class TestSink12 implements SinkFunction<StreamWindow<Integer>> {
+
+ public static List<StreamWindow<Integer>> windows = Collections
+ .synchronizedList(new ArrayList<StreamWindow<Integer>>());
+
+ @Override
+ public void invoke(StreamWindow<Integer> value) throws Exception {
+ windows.add(value);
+ }
+
+ }
+
+ @SuppressWarnings("serial")
+ private static class TestSink13 implements SinkFunction<StreamWindow<Integer>> {
+
+ public static List<StreamWindow<Integer>> windows = Collections
+ .synchronizedList(new ArrayList<StreamWindow<Integer>>());
+
+ @Override
+ public void invoke(StreamWindow<Integer> value) throws Exception {
+ windows.add(value);
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
index eb49e26..6e22021 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
@@ -48,6 +48,8 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StreamMap;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.apache.flink.util.InstantiationUtil;
@@ -103,12 +105,13 @@ public class StatefulOperatorTest {
@Test
public void apiTest() throws Exception {
StreamExecutionEnvironment env = new TestStreamEnvironment(3, 32);
-
+
KeyedDataStream<Integer> keyedStream = env.fromCollection(Arrays.asList(0, 1, 2, 3, 4, 5, 6)).keyBy(new ModKey(4));
keyedStream.map(new StatefulMapper()).addSink(new SinkFunction<String>() {
private static final long serialVersionUID = 1L;
- public void invoke(String value) throws Exception {}
+ public void invoke(String value) throws Exception {
+ }
});
keyedStream.map(new StatefulMapper2()).setParallelism(1).addSink(new SinkFunction<String>() {
@@ -128,8 +131,8 @@ public class StatefulOperatorTest {
private void processInputs(StreamMap<Integer, ?> map, List<Integer> input) throws Exception {
for (Integer i : input) {
- map.getRuntimeContext().setNextInput(i);
- map.processElement(i);
+ map.getRuntimeContext().setNextInput(new StreamRecord<Integer>(i, 0L));
+ map.processElement(new StreamRecord<Integer>(i, 0L));
}
}
@@ -144,11 +147,16 @@ public class StatefulOperatorTest {
StreamMap<Integer, String> op = new StreamMap<Integer, String>(new StatefulMapper());
- op.setup(new Output<String>() {
+ op.setup(new Output<StreamRecord<String>>() {
@Override
- public void collect(String record) {
- outputList.add(record);
+ public void collect(StreamRecord<String> record) {
+ outputList.add(record.getValue());
+ }
+
+ @Override
+ public void emitWatermark(Watermark mark) {
+
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamtask/MockRecordWriter.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamtask/MockRecordWriter.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamtask/MockRecordWriter.java
index 4ac7fda..317a21c 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamtask/MockRecordWriter.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamtask/MockRecordWriter.java
@@ -40,6 +40,6 @@ public class MockRecordWriter extends RecordWriter<SerializationDelegate<StreamR
@Override
public void emit(SerializationDelegate<StreamRecord<Tuple1<Integer>>> record) {
- emittedRecords.add(record.getInstance().getObject().f0);
+ emittedRecords.add(record.getInstance().getValue().f0);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/BasicWindowBufferTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/BasicWindowBufferTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/BasicWindowBufferTest.java
index 967c719..6bc0e30 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/BasicWindowBufferTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/BasicWindowBufferTest.java
@@ -22,10 +22,10 @@ import static org.junit.Assert.assertEquals;
import java.util.ArrayList;
import java.util.List;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBuffer;
-import org.apache.flink.streaming.api.windowing.windowbuffer.WindowBuffer;
-import org.apache.flink.util.Collector;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.junit.Test;
public class BasicWindowBufferTest {
@@ -33,7 +33,7 @@ public class BasicWindowBufferTest {
@Test
public void testEmitWindow() throws Exception {
- TestCollector<StreamWindow<Integer>> collector = new TestCollector<StreamWindow<Integer>>();
+ TestOutput<StreamWindow<Integer>> collector = new TestOutput<StreamWindow<Integer>>();
List<StreamWindow<Integer>> collected = collector.getCollected();
WindowBuffer<Integer> wb = new BasicWindowBuffer<Integer>();
@@ -60,13 +60,13 @@ public class BasicWindowBufferTest {
assertEquals(2, collected.size());
}
- public static class TestCollector<T> implements Collector<T> {
+ public static class TestOutput<T> implements Output<StreamRecord<T>> {
private final List<T> collected = new ArrayList<T>();
@Override
- public void collect(T record) {
- collected.add(record);
+ public void collect(StreamRecord<T> record) {
+ collected.add(record.getValue());
}
@Override
@@ -77,6 +77,10 @@ public class BasicWindowBufferTest {
return collected;
}
+ @Override
+ public void emitWatermark(Watermark mark) {
+
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingCountGroupedPreReducerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingCountGroupedPreReducerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingCountGroupedPreReducerTest.java
index c91910b..8430499 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingCountGroupedPreReducerTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingCountGroupedPreReducerTest.java
@@ -32,8 +32,9 @@ import org.apache.flink.api.java.operators.Keys;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest.TestCollector;
import org.apache.flink.streaming.util.keys.KeySelectorUtil;
+import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest.TestOutput;
+
import org.junit.Test;
public class JumpingCountGroupedPreReducerTest {
@@ -58,7 +59,7 @@ public class JumpingCountGroupedPreReducerTest {
inputs.add(new Tuple2<Integer, Integer>(1, -2));
inputs.add(new Tuple2<Integer, Integer>(100, -200));
- TestCollector<StreamWindow<Tuple2<Integer, Integer>>> collector = new TestCollector<StreamWindow<Tuple2<Integer, Integer>>>();
+ TestOutput<StreamWindow<Tuple2<Integer, Integer>>> collector = new TestOutput<StreamWindow<Tuple2<Integer, Integer>>>();
List<StreamWindow<Tuple2<Integer, Integer>>> collected = collector.getCollected();
WindowBuffer<Tuple2<Integer, Integer>> wb = new JumpingCountGroupedPreReducer<Tuple2<Integer, Integer>>(
@@ -109,7 +110,7 @@ public class JumpingCountGroupedPreReducerTest {
inputs.add(new Tuple2<Integer, Integer>(1, -2));
inputs.add(new Tuple2<Integer, Integer>(100, -200));
- TestCollector<StreamWindow<Tuple2<Integer, Integer>>> collector = new TestCollector<StreamWindow<Tuple2<Integer, Integer>>>();
+ TestOutput<StreamWindow<Tuple2<Integer, Integer>>> collector = new TestOutput<StreamWindow<Tuple2<Integer, Integer>>>();
List<StreamWindow<Tuple2<Integer, Integer>>> collected = collector.getCollected();
WindowBuffer<Tuple2<Integer, Integer>> wb = new JumpingCountGroupedPreReducer<Tuple2<Integer, Integer>>(
http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingCountPreReducerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingCountPreReducerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingCountPreReducerTest.java
index ba890ab..2279264 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingCountPreReducerTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingCountPreReducerTest.java
@@ -27,7 +27,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest.TestCollector;
+import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest.TestOutput;
import org.junit.Test;
public class JumpingCountPreReducerTest {
@@ -48,7 +48,7 @@ public class JumpingCountPreReducerTest {
inputs.add(new Tuple2<Integer, Integer>(4, -2));
inputs.add(new Tuple2<Integer, Integer>(5, -3));
- TestCollector<StreamWindow<Tuple2<Integer, Integer>>> collector = new TestCollector<StreamWindow<Tuple2<Integer, Integer>>>();
+ TestOutput<StreamWindow<Tuple2<Integer, Integer>>> collector = new TestOutput<StreamWindow<Tuple2<Integer, Integer>>>();
List<StreamWindow<Tuple2<Integer, Integer>>> collected = collector.getCollected();
WindowBuffer<Tuple2<Integer, Integer>> wb = new JumpingCountPreReducer<Tuple2<Integer, Integer>>(
http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingTimePreReducerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingTimePreReducerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingTimePreReducerTest.java
index 5b693e7..ce312d3 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingTimePreReducerTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingTimePreReducerTest.java
@@ -27,7 +27,7 @@ import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.api.windowing.StreamWindow;
import org.apache.flink.streaming.api.windowing.helper.Timestamp;
import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
-import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest.TestCollector;
+import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest.TestOutput;
import org.junit.Test;
public class JumpingTimePreReducerTest {
@@ -39,7 +39,7 @@ public class JumpingTimePreReducerTest {
@Test
public void testEmitWindow() throws Exception {
- TestCollector<StreamWindow<Integer>> collector = new TestCollector<StreamWindow<Integer>>();
+ TestOutput<StreamWindow<Integer>> collector = new TestOutput<StreamWindow<Integer>>();
List<StreamWindow<Integer>> collected = collector.getCollected();
WindowBuffer<Integer> wb = new JumpingTimePreReducer<Integer>(
http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountGroupedPreReducerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountGroupedPreReducerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountGroupedPreReducerTest.java
index 377bdb5..7f58527 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountGroupedPreReducerTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountGroupedPreReducerTest.java
@@ -1,34 +1,35 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
package org.apache.flink.streaming.api.windowing.windowbuffer;
-import static org.apache.flink.streaming.api.windowing.windowbuffer.SlidingTimeGroupedPreReducerTest.checkResults;
+import static org.junit.Assert.assertEquals;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.streaming.api.operators.windowing.WindowIntegrationTest;
+import org.apache.flink.streaming.api.operators.windowing.WindowingITCase;
import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest.TestCollector;
+import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest.TestOutput;
import org.junit.Test;
public class SlidingCountGroupedPreReducerTest {
@@ -37,11 +38,11 @@ public class SlidingCountGroupedPreReducerTest {
ReduceFunction<Integer> reducer = new SumReducer();
- KeySelector<Integer, ?> key = new WindowIntegrationTest.ModKey(2);
+ KeySelector<Integer, ?> key = new WindowingITCase.ModKey(2);
@Test
public void testPreReduce1() throws Exception {
- TestCollector<StreamWindow<Integer>> collector = new TestCollector<StreamWindow<Integer>>();
+ TestOutput<StreamWindow<Integer>> collector = new TestOutput<StreamWindow<Integer>>();
SlidingCountGroupedPreReducer<Integer> preReducer = new SlidingCountGroupedPreReducer<Integer>(
reducer, serializer, key, 3, 2, 0);
@@ -84,7 +85,7 @@ public class SlidingCountGroupedPreReducerTest {
@Test
public void testPreReduce2() throws Exception {
- TestCollector<StreamWindow<Integer>> collector = new TestCollector<StreamWindow<Integer>>();
+ TestOutput<StreamWindow<Integer>> collector = new TestOutput<StreamWindow<Integer>>();
SlidingCountGroupedPreReducer<Integer> preReducer = new SlidingCountGroupedPreReducer<Integer>(
reducer, serializer, key, 5, 2, 0);
@@ -126,7 +127,7 @@ public class SlidingCountGroupedPreReducerTest {
@Test
public void testPreReduce3() throws Exception {
- TestCollector<StreamWindow<Integer>> collector = new TestCollector<StreamWindow<Integer>>();
+ TestOutput<StreamWindow<Integer>> collector = new TestOutput<StreamWindow<Integer>>();
SlidingCountGroupedPreReducer<Integer> preReducer = new SlidingCountGroupedPreReducer<Integer>(
reducer, serializer, key, 6, 3, 0);
@@ -163,7 +164,7 @@ public class SlidingCountGroupedPreReducerTest {
@Test
public void testPreReduce4() throws Exception {
- TestCollector<StreamWindow<Integer>> collector = new TestCollector<StreamWindow<Integer>>();
+ TestOutput<StreamWindow<Integer>> collector = new TestOutput<StreamWindow<Integer>>();
SlidingCountGroupedPreReducer<Integer> preReducer = new SlidingCountGroupedPreReducer<Integer>(
reducer, serializer, key, 5, 1, 2);
@@ -217,4 +218,18 @@ public class SlidingCountGroupedPreReducerTest {
}
+
+ protected static void checkResults(List<StreamWindow<Integer>> expected,
+ List<StreamWindow<Integer>> actual) {
+
+ for (StreamWindow<Integer> sw : expected) {
+ Collections.sort(sw);
+ }
+
+ for (StreamWindow<Integer> sw : actual) {
+ Collections.sort(sw);
+ }
+
+ assertEquals(expected, actual);
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountPreReducerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountPreReducerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountPreReducerTest.java
index 3ce65f1..156b875 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountPreReducerTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingCountPreReducerTest.java
@@ -26,7 +26,7 @@ import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest.TestCollector;
+import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest.TestOutput;
import org.junit.Test;
public class SlidingCountPreReducerTest {
@@ -37,7 +37,7 @@ public class SlidingCountPreReducerTest {
@Test
public void testPreReduce1() throws Exception {
- TestCollector<StreamWindow<Integer>> collector = new TestCollector<StreamWindow<Integer>>();
+ TestOutput<StreamWindow<Integer>> collector = new TestOutput<StreamWindow<Integer>>();
SlidingCountPreReducer<Integer> preReducer = new SlidingCountPreReducer<Integer>(reducer,
serializer, 3, 2, 0);
@@ -80,7 +80,7 @@ public class SlidingCountPreReducerTest {
@Test
public void testPreReduce2() throws Exception {
- TestCollector<StreamWindow<Integer>> collector = new TestCollector<StreamWindow<Integer>>();
+ TestOutput<StreamWindow<Integer>> collector = new TestOutput<StreamWindow<Integer>>();
SlidingCountPreReducer<Integer> preReducer = new SlidingCountPreReducer<Integer>(reducer,
serializer, 5, 2, 0);
@@ -122,7 +122,7 @@ public class SlidingCountPreReducerTest {
@Test
public void testPreReduce3() throws Exception {
- TestCollector<StreamWindow<Integer>> collector = new TestCollector<StreamWindow<Integer>>();
+ TestOutput<StreamWindow<Integer>> collector = new TestOutput<StreamWindow<Integer>>();
SlidingCountPreReducer<Integer> preReducer = new SlidingCountPreReducer<Integer>(reducer,
serializer, 6, 3, 0);
@@ -159,7 +159,7 @@ public class SlidingCountPreReducerTest {
@Test
public void testPreReduce4() throws Exception {
- TestCollector<StreamWindow<Integer>> collector = new TestCollector<StreamWindow<Integer>>();
+ TestOutput<StreamWindow<Integer>> collector = new TestOutput<StreamWindow<Integer>>();
SlidingCountPreReducer<Integer> preReducer = new SlidingCountPreReducer<Integer>(reducer,
serializer, 5, 1, 2);
http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimeGroupedPreReducerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimeGroupedPreReducerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimeGroupedPreReducerTest.java
index 3f1cba1..68bceda 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimeGroupedPreReducerTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimeGroupedPreReducerTest.java
@@ -31,11 +31,11 @@ import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.api.java.typeutils.TypeInfoParser;
-import org.apache.flink.streaming.api.operators.windowing.WindowIntegrationTest;
+import org.apache.flink.streaming.api.operators.windowing.WindowingITCase;
import org.apache.flink.streaming.api.windowing.StreamWindow;
import org.apache.flink.streaming.api.windowing.helper.Timestamp;
import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
-import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest.TestCollector;
+import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest.TestOutput;
import org.junit.Test;
public class SlidingTimeGroupedPreReducerTest {
@@ -48,7 +48,7 @@ public class SlidingTimeGroupedPreReducerTest {
ReduceFunction<Tuple2<Integer, Integer>> tupleReducer = new TupleSumReducer();
- KeySelector<Integer, ?> key = new WindowIntegrationTest.ModKey(2);
+ KeySelector<Integer, ?> key = new WindowingITCase.ModKey(2);
KeySelector<Tuple2<Integer, Integer>, ?> tupleKey = new TupleModKey(2);
@Test
@@ -58,7 +58,7 @@ public class SlidingTimeGroupedPreReducerTest {
// replaying the same sequence of elements with a later timestamp and expecting the same
// result.
- TestCollector<StreamWindow<Tuple2<Integer, Integer>>> collector = new TestCollector<StreamWindow<Tuple2<Integer, Integer>>>();
+ TestOutput<StreamWindow<Tuple2<Integer, Integer>>> collector = new TestOutput<StreamWindow<Tuple2<Integer, Integer>>>();
SlidingTimeGroupedPreReducer<Tuple2<Integer, Integer>> preReducer = new SlidingTimeGroupedPreReducer<Tuple2<Integer, Integer>>(tupleReducer,
tupleType.createSerializer(new ExecutionConfig()), tupleKey, 3, 2, new TimestampWrapper<Tuple2<Integer, Integer>>(new Timestamp<Tuple2<Integer, Integer>>() {
@@ -190,7 +190,7 @@ public class SlidingTimeGroupedPreReducerTest {
@Test
public void testPreReduce2() throws Exception {
- TestCollector<StreamWindow<Integer>> collector = new TestCollector<StreamWindow<Integer>>();
+ TestOutput<StreamWindow<Integer>> collector = new TestOutput<StreamWindow<Integer>>();
SlidingTimeGroupedPreReducer<Integer> preReducer = new SlidingTimeGroupedPreReducer<Integer>(
reducer, serializer, key, 5, 2, new TimestampWrapper<Integer>(
@@ -241,7 +241,7 @@ public class SlidingTimeGroupedPreReducerTest {
@Test
public void testPreReduce3() throws Exception {
- TestCollector<StreamWindow<Integer>> collector = new TestCollector<StreamWindow<Integer>>();
+ TestOutput<StreamWindow<Integer>> collector = new TestOutput<StreamWindow<Integer>>();
SlidingTimeGroupedPreReducer<Integer> preReducer = new SlidingTimeGroupedPreReducer<Integer>(
reducer, serializer, key, 6, 3, new TimestampWrapper<Integer>(
@@ -287,7 +287,7 @@ public class SlidingTimeGroupedPreReducerTest {
@Test
public void testPreReduce4() throws Exception {
- TestCollector<StreamWindow<Integer>> collector = new TestCollector<StreamWindow<Integer>>();
+ TestOutput<StreamWindow<Integer>> collector = new TestOutput<StreamWindow<Integer>>();
SlidingTimeGroupedPreReducer<Integer> preReducer = new SlidingTimeGroupedPreReducer<Integer>(
reducer, serializer, key, 3, 2, new TimestampWrapper<Integer>(
http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducerTest.java
index 0519da7..6a36c57 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducerTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingTimePreReducerTest.java
@@ -32,7 +32,7 @@ import org.apache.flink.api.java.typeutils.TypeInfoParser;
import org.apache.flink.streaming.api.windowing.StreamWindow;
import org.apache.flink.streaming.api.windowing.helper.Timestamp;
import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
-import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest.TestCollector;
+import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest.TestOutput;
import org.junit.Test;
public class SlidingTimePreReducerTest {
@@ -50,7 +50,7 @@ public class SlidingTimePreReducerTest {
// replaying the same sequence of elements with a later timestamp and expecting the same
// result.
- TestCollector<StreamWindow<Tuple2<Integer, Integer>>> collector = new TestCollector<StreamWindow<Tuple2<Integer, Integer>>>();
+ TestOutput<StreamWindow<Tuple2<Integer, Integer>>> collector = new TestOutput<StreamWindow<Tuple2<Integer, Integer>>>();
SlidingTimePreReducer<Tuple2<Integer, Integer>> preReducer = new SlidingTimePreReducer<Tuple2<Integer, Integer>>(tupleReducer,
tupleType.createSerializer(new ExecutionConfig()), 3, 2, new TimestampWrapper<Tuple2<Integer, Integer>>(new Timestamp<Tuple2<Integer, Integer>>() {
@@ -145,7 +145,7 @@ public class SlidingTimePreReducerTest {
@Test
public void testPreReduce2() throws Exception {
- TestCollector<StreamWindow<Integer>> collector = new TestCollector<StreamWindow<Integer>>();
+ TestOutput<StreamWindow<Integer>> collector = new TestOutput<StreamWindow<Integer>>();
SlidingTimePreReducer<Integer> preReducer = new SlidingTimePreReducer<Integer>(reducer,
serializer, 5, 2, new TimestampWrapper<Integer>(new Timestamp<Integer>() {
@@ -195,7 +195,7 @@ public class SlidingTimePreReducerTest {
@Test
public void testPreReduce3() throws Exception {
- TestCollector<StreamWindow<Integer>> collector = new TestCollector<StreamWindow<Integer>>();
+ TestOutput<StreamWindow<Integer>> collector = new TestOutput<StreamWindow<Integer>>();
SlidingTimePreReducer<Integer> preReducer = new SlidingTimePreReducer<Integer>(reducer,
serializer, 6, 3, new TimestampWrapper<Integer>(new Timestamp<Integer>() {
@@ -240,7 +240,7 @@ public class SlidingTimePreReducerTest {
@Test
public void testPreReduce4() throws Exception {
- TestCollector<StreamWindow<Integer>> collector = new TestCollector<StreamWindow<Integer>>();
+ TestOutput<StreamWindow<Integer>> collector = new TestOutput<StreamWindow<Integer>>();
SlidingTimePreReducer<Integer> preReducer = new SlidingTimePreReducer<Integer>(reducer,
serializer, 3, 2, new TimestampWrapper<Integer>(new Timestamp<Integer>() {
http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingGroupedPreReducerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingGroupedPreReducerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingGroupedPreReducerTest.java
index c5107bf..3aee288 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingGroupedPreReducerTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingGroupedPreReducerTest.java
@@ -32,7 +32,7 @@ import org.apache.flink.api.java.operators.Keys;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest.TestCollector;
+import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest.TestOutput;
import org.apache.flink.streaming.util.keys.KeySelectorUtil;
import org.junit.Test;
@@ -57,7 +57,7 @@ public class TumblingGroupedPreReducerTest {
inputs.add(new Tuple2<Integer, Integer>(1, -1));
inputs.add(new Tuple2<Integer, Integer>(1, -2));
- TestCollector<StreamWindow<Tuple2<Integer, Integer>>> collector = new TestCollector<StreamWindow<Tuple2<Integer, Integer>>>();
+ TestOutput<StreamWindow<Tuple2<Integer, Integer>>> collector = new TestOutput<StreamWindow<Tuple2<Integer, Integer>>>();
List<StreamWindow<Tuple2<Integer, Integer>>> collected = collector.getCollected();
WindowBuffer<Tuple2<Integer, Integer>> wb = new TumblingGroupedPreReducer<Tuple2<Integer, Integer>>(
@@ -104,7 +104,7 @@ public class TumblingGroupedPreReducerTest {
inputs.add(new Tuple2<Integer, Integer>(1, -1));
inputs.add(new Tuple2<Integer, Integer>(1, -2));
- TestCollector<StreamWindow<Tuple2<Integer, Integer>>> collector = new TestCollector<StreamWindow<Tuple2<Integer, Integer>>>();
+ TestOutput<StreamWindow<Tuple2<Integer, Integer>>> collector = new TestOutput<StreamWindow<Tuple2<Integer, Integer>>>();
List<StreamWindow<Tuple2<Integer, Integer>>> collected = collector.getCollected();
WindowBuffer<Tuple2<Integer, Integer>> wb = new TumblingGroupedPreReducer<Tuple2<Integer, Integer>>(
http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingPreReducerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingPreReducerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingPreReducerTest.java
index b8de02e..3e537a5 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingPreReducerTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingPreReducerTest.java
@@ -27,9 +27,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.streaming.api.windowing.windowbuffer.TumblingPreReducer;
-import org.apache.flink.streaming.api.windowing.windowbuffer.WindowBuffer;
-import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest.TestCollector;
+import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest.TestOutput;
import org.junit.Test;
public class TumblingPreReducerTest {
@@ -49,7 +47,7 @@ public class TumblingPreReducerTest {
inputs.add(new Tuple2<Integer, Integer>(3, -1));
inputs.add(new Tuple2<Integer, Integer>(4, -2));
- TestCollector<StreamWindow<Tuple2<Integer, Integer>>> collector = new TestCollector<StreamWindow<Tuple2<Integer, Integer>>>();
+ TestOutput<StreamWindow<Tuple2<Integer, Integer>>> collector = new TestOutput<StreamWindow<Tuple2<Integer, Integer>>>();
List<StreamWindow<Tuple2<Integer, Integer>>> collected = collector.getCollected();
WindowBuffer<Tuple2<Integer, Integer>> wb = new TumblingPreReducer<Tuple2<Integer, Integer>>(
http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferIOTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferIOTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferIOTest.java
index 3f8401d..d8a3696 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferIOTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferIOTest.java
@@ -29,7 +29,6 @@ import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.util.event.EventListener;
-import org.apache.flink.streaming.runtime.io.BarrierBuffer;
import org.junit.Test;
public class BarrierBufferIOTest {
@@ -55,7 +54,7 @@ public class BarrierBufferIOTest {
if (boe.isBuffer()) {
boe.getBuffer().recycle();
} else {
- barrierBuffer.processSuperstep(boe);
+ barrierBuffer.processBarrier(boe);
}
}
// System.out.println("Ran for " + (System.currentTimeMillis() -
@@ -101,14 +100,14 @@ public class BarrierBufferIOTest {
private int numChannels;
private BufferPool[] bufferPools;
- private int[] currentSupersteps;
+ private int[] currentBarriers;
BarrierGenerator[] barrierGens;
int currentChannel = 0;
long c = 0;
public MockInputGate(BufferPool[] bufferPools, BarrierGenerator[] barrierGens) {
this.numChannels = bufferPools.length;
- this.currentSupersteps = new int[numChannels];
+ this.currentBarriers = new int[numChannels];
this.bufferPools = bufferPools;
this.barrierGens = barrierGens;
}
@@ -132,7 +131,7 @@ public class BarrierBufferIOTest {
currentChannel = (currentChannel + 1) % numChannels;
if (barrierGens[currentChannel].isNextBarrier()) {
- return BarrierBufferTest.createSuperstep(++currentSupersteps[currentChannel],
+ return BarrierBufferTest.createBarrier(++currentBarriers[currentChannel],
currentChannel);
} else {
Buffer buffer = bufferPools[currentChannel].requestBuffer();
http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
index 89ec7dc..cb5e046 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
@@ -35,7 +35,7 @@ import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.util.event.EventListener;
-import org.apache.flink.streaming.runtime.tasks.StreamingSuperstep;
+import org.apache.flink.streaming.runtime.tasks.CheckpointBarrier;
import org.junit.Test;
@@ -67,10 +67,10 @@ public class BarrierBufferTest {
List<BufferOrEvent> input = new LinkedList<BufferOrEvent>();
input.add(createBuffer(0));
input.add(createBuffer(0));
- input.add(createSuperstep(1, 0));
+ input.add(createBarrier(1, 0));
input.add(createBuffer(0));
input.add(createBuffer(0));
- input.add(createSuperstep(2, 0));
+ input.add(createBarrier(2, 0));
input.add(createBuffer(0));
InputGate mockIG = new MockInputGate(1, input);
@@ -82,11 +82,11 @@ public class BarrierBufferTest {
assertEquals(input.get(0), nextBoe = bb.getNextNonBlocked());
assertEquals(input.get(1), nextBoe = bb.getNextNonBlocked());
assertEquals(input.get(2), nextBoe = bb.getNextNonBlocked());
- bb.processSuperstep(nextBoe);
+ bb.processBarrier(nextBoe);
assertEquals(input.get(3), nextBoe = bb.getNextNonBlocked());
assertEquals(input.get(4), nextBoe = bb.getNextNonBlocked());
assertEquals(input.get(5), nextBoe = bb.getNextNonBlocked());
- bb.processSuperstep(nextBoe);
+ bb.processBarrier(nextBoe);
assertEquals(input.get(6), nextBoe = bb.getNextNonBlocked());
bb.cleanup();
@@ -98,18 +98,18 @@ public class BarrierBufferTest {
List<BufferOrEvent> input = new LinkedList<BufferOrEvent>();
input.add(createBuffer(0));
input.add(createBuffer(1));
- input.add(createSuperstep(1, 0));
- input.add(createSuperstep(2, 0));
+ input.add(createBarrier(1, 0));
+ input.add(createBarrier(2, 0));
input.add(createBuffer(0));
- input.add(createSuperstep(3, 0));
+ input.add(createBarrier(3, 0));
input.add(createBuffer(0));
input.add(createBuffer(1));
- input.add(createSuperstep(1, 1));
+ input.add(createBarrier(1, 1));
input.add(createBuffer(0));
input.add(createBuffer(1));
- input.add(createSuperstep(2, 1));
- input.add(createSuperstep(3, 1));
- input.add(createSuperstep(4, 0));
+ input.add(createBarrier(2, 1));
+ input.add(createBarrier(3, 1));
+ input.add(createBarrier(4, 0));
input.add(createBuffer(0));
input.add(new BufferOrEvent(new EndOfPartitionEvent(), 1));
@@ -123,24 +123,24 @@ public class BarrierBufferTest {
check(input.get(0), nextBoe = bb.getNextNonBlocked());
check(input.get(1), nextBoe = bb.getNextNonBlocked());
check(input.get(2), nextBoe = bb.getNextNonBlocked());
- bb.processSuperstep(nextBoe);
+ bb.processBarrier(nextBoe);
check(input.get(7), nextBoe = bb.getNextNonBlocked());
check(input.get(8), nextBoe = bb.getNextNonBlocked());
- bb.processSuperstep(nextBoe);
+ bb.processBarrier(nextBoe);
check(input.get(3), nextBoe = bb.getNextNonBlocked());
- bb.processSuperstep(nextBoe);
+ bb.processBarrier(nextBoe);
check(input.get(10), nextBoe = bb.getNextNonBlocked());
check(input.get(11), nextBoe = bb.getNextNonBlocked());
- bb.processSuperstep(nextBoe);
+ bb.processBarrier(nextBoe);
check(input.get(4), nextBoe = bb.getNextNonBlocked());
check(input.get(5), nextBoe = bb.getNextNonBlocked());
- bb.processSuperstep(nextBoe);
+ bb.processBarrier(nextBoe);
check(input.get(12), nextBoe = bb.getNextNonBlocked());
- bb.processSuperstep(nextBoe);
+ bb.processBarrier(nextBoe);
check(input.get(6), nextBoe = bb.getNextNonBlocked());
check(input.get(9), nextBoe = bb.getNextNonBlocked());
check(input.get(13), nextBoe = bb.getNextNonBlocked());
- bb.processSuperstep(nextBoe);
+ bb.processBarrier(nextBoe);
check(input.get(14), nextBoe = bb.getNextNonBlocked());
check(input.get(15), nextBoe = bb.getNextNonBlocked());
@@ -206,8 +206,8 @@ public class BarrierBufferTest {
}
}
- protected static BufferOrEvent createSuperstep(long id, int channel) {
- return new BufferOrEvent(new StreamingSuperstep(id, System.currentTimeMillis()), channel);
+ protected static BufferOrEvent createBarrier(long id, int channel) {
+ return new BufferOrEvent(new CheckpointBarrier(id, System.currentTimeMillis()), channel);
}
protected static BufferOrEvent createBuffer(int channel) {
http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/CoRecordReaderTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/CoRecordReaderTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/CoRecordReaderTest.java
deleted file mode 100644
index 528829d..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/CoRecordReaderTest.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.io;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.IOException;
-import java.util.LinkedList;
-import java.util.List;
-
-import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-import org.apache.flink.streaming.runtime.io.BarrierBuffer;
-import org.apache.flink.streaming.runtime.io.CoRecordReader;
-import org.apache.flink.streaming.runtime.io.BarrierBufferTest.MockInputGate;
-import org.junit.Test;
-
-public class CoRecordReaderTest {
-
- @Test
- public void test() throws InterruptedException, IOException {
-
- List<BufferOrEvent> input1 = new LinkedList<BufferOrEvent>();
- input1.add(BarrierBufferTest.createBuffer(0));
- input1.add(BarrierBufferTest.createSuperstep(1, 0));
- input1.add(BarrierBufferTest.createBuffer(0));
-
- InputGate ig1 = new MockInputGate(1, input1);
-
- List<BufferOrEvent> input2 = new LinkedList<BufferOrEvent>();
- input2.add(BarrierBufferTest.createBuffer(0));
- input2.add(BarrierBufferTest.createBuffer(0));
- input2.add(BarrierBufferTest.createSuperstep(1, 0));
- input2.add(BarrierBufferTest.createBuffer(0));
-
- InputGate ig2 = new MockInputGate(1, input2);
-
- CoRecordReader<?, ?> coReader = new CoRecordReader<IOReadableWritable, IOReadableWritable>(
- ig1, ig2);
- BarrierBuffer b1 = coReader.barrierBuffer1;
- BarrierBuffer b2 = coReader.barrierBuffer2;
-
- coReader.addToAvailable(ig1);
- coReader.addToAvailable(ig2);
- coReader.addToAvailable(ig2);
- coReader.addToAvailable(ig1);
-
- assertEquals(1, coReader.getNextReaderIndexBlocking());
- b1.getNextNonBlocked();
-
- assertEquals(2, coReader.getNextReaderIndexBlocking());
- b2.getNextNonBlocked();
-
- assertEquals(2, coReader.getNextReaderIndexBlocking());
- b2.getNextNonBlocked();
-
- assertEquals(1, coReader.getNextReaderIndexBlocking());
- b1.getNextNonBlocked();
- b1.processSuperstep(input1.get(1));
-
- coReader.addToAvailable(ig1);
- coReader.addToAvailable(ig2);
- coReader.addToAvailable(ig2);
-
- assertEquals(2, coReader.getNextReaderIndexBlocking());
- b2.getNextNonBlocked();
- b2.processSuperstep(input2.get(2));
-
- assertEquals(1, coReader.getNextReaderIndexBlocking());
- b1.getNextNonBlocked();
-
- assertEquals(2, coReader.getNextReaderIndexBlocking());
- b2.getNextNonBlocked();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/BroadcastPartitionerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/BroadcastPartitionerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/BroadcastPartitionerTest.java
index aa4d24a..a1cea13 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/BroadcastPartitionerTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/BroadcastPartitionerTest.java
@@ -21,7 +21,6 @@ import static org.junit.Assert.assertArrayEquals;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.junit.Before;
import org.junit.Test;
@@ -32,7 +31,7 @@ public class BroadcastPartitionerTest {
private BroadcastPartitioner<Tuple> broadcastPartitioner2;
private BroadcastPartitioner<Tuple> broadcastPartitioner3;
- private StreamRecord<Tuple> streamRecord = new StreamRecord<Tuple>();
+ private StreamRecord<Tuple> streamRecord = new StreamRecord<Tuple>(null);
private SerializationDelegate<StreamRecord<Tuple>> sd = new SerializationDelegate<StreamRecord<Tuple>>(null);
@Before
http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/DistributePartitionerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/DistributePartitionerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/DistributePartitionerTest.java
index b37e43a..2643bba 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/DistributePartitionerTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/DistributePartitionerTest.java
@@ -28,7 +28,7 @@ import org.junit.Test;
public class DistributePartitionerTest {
private RebalancePartitioner<Tuple> distributePartitioner;
- private StreamRecord<Tuple> streamRecord = new StreamRecord<Tuple>();
+ private StreamRecord<Tuple> streamRecord = new StreamRecord<Tuple>(null);
private SerializationDelegate<StreamRecord<Tuple>> sd = new SerializationDelegate<StreamRecord<Tuple>>(
null);
http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/FieldsPartitionerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/FieldsPartitionerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/FieldsPartitionerTest.java
index 94d29ac..05541f5 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/FieldsPartitionerTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/FieldsPartitionerTest.java
@@ -21,34 +21,28 @@ import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.runtime.partitioner.FieldsPartitioner;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.junit.Before;
import org.junit.Test;
public class FieldsPartitionerTest {
- private FieldsPartitioner<Tuple> fieldsPartitioner;
- private StreamRecord<Tuple> streamRecord1 = new StreamRecord<Tuple>()
- .setObject(new Tuple2<String, Integer>("test", 0));
- private StreamRecord<Tuple> streamRecord2 = new StreamRecord<Tuple>()
- .setObject(new Tuple2<String, Integer>("test", 42));
- private SerializationDelegate<StreamRecord<Tuple>> sd1 = new SerializationDelegate<StreamRecord<Tuple>>(
- null);
- private SerializationDelegate<StreamRecord<Tuple>> sd2 = new SerializationDelegate<StreamRecord<Tuple>>(
- null);
+ private FieldsPartitioner<Tuple2<String, Integer>> fieldsPartitioner;
+ private StreamRecord<Tuple2<String, Integer>> streamRecord1 = new StreamRecord<Tuple2<String, Integer>>(new Tuple2<String, Integer>("test", 0));
+ private StreamRecord<Tuple2<String, Integer>> streamRecord2 = new StreamRecord<Tuple2<String, Integer>>(new Tuple2<String, Integer>("test", 42));
+ private SerializationDelegate<StreamRecord<Tuple2<String, Integer>>> sd1 = new SerializationDelegate<StreamRecord<Tuple2<String, Integer>>>(null);
+ private SerializationDelegate<StreamRecord<Tuple2<String, Integer>>> sd2 = new SerializationDelegate<StreamRecord<Tuple2<String, Integer>>>(null);
@Before
public void setPartitioner() {
- fieldsPartitioner = new FieldsPartitioner<Tuple>(new KeySelector<Tuple, String>() {
+ fieldsPartitioner = new FieldsPartitioner<Tuple2<String, Integer>>(new KeySelector<Tuple2<String, Integer>, String>() {
private static final long serialVersionUID = 1L;
@Override
- public String getKey(Tuple value) throws Exception {
+ public String getKey(Tuple2<String, Integer> value) throws Exception {
return value.getField(0);
}
});