You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/02/28 13:13:42 UTC
[1/2] flink git commit: [FLINK-5157] [streaming] Introduce
ProcessAllWindowFunction
Repository: flink
Updated Branches:
refs/heads/master 87b907736 -> 788b83921
http://git-wip-us.apache.org/repos/asf/flink/blob/788b8392/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
index b6c1618..34eac9e 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
@@ -38,6 +38,7 @@ import org.apache.flink.streaming.api.datastream.AllWindowedStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
@@ -383,6 +384,119 @@ public class AllWindowTranslationTest {
processElementAndEnsureOutput(operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
}
+ @Test
+ @SuppressWarnings("rawtypes")
+ public void testReduceWithProcessWindowFunctionEventTime() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+ DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+ DummyReducer reducer = new DummyReducer();
+
+ DataStream<Tuple3<String, String, Integer>> window = source
+ .windowAll(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+ .reduce(reducer, new ProcessAllWindowFunction<Tuple2<String, Integer>, Tuple3<String, String, Integer>, TimeWindow>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void process(
+ Context ctx,
+ Iterable<Tuple2<String, Integer>> values,
+ Collector<Tuple3<String, String, Integer>> out) throws Exception {
+ for (Tuple2<String, Integer> in : values) {
+ out.collect(new Tuple3<>(in.f0, in.f0, in.f1));
+ }
+ }
+ });
+
+ OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform =
+ (OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window.getTransformation();
+ OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator();
+ Assert.assertTrue(operator instanceof WindowOperator);
+ WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
+ Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
+ Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows);
+ Assert.assertTrue(winOperator.getStateDescriptor() instanceof ReducingStateDescriptor);
+
+ processElementAndEnsureOutput(operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+ }
+
+ @Test
+ @SuppressWarnings("rawtypes")
+ public void testReduceWithProcessWindowFunctionProcessingTime() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+
+ DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+ DataStream<Tuple3<String, String, Integer>> window = source
+ .windowAll(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+ .reduce(new DummyReducer(), new ProcessAllWindowFunction<Tuple2<String, Integer>, Tuple3<String, String, Integer>, TimeWindow>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void process(
+ Context ctx,
+ Iterable<Tuple2<String, Integer>> values,
+ Collector<Tuple3<String, String, Integer>> out) throws Exception {
+ for (Tuple2<String, Integer> in : values) {
+ out.collect(new Tuple3<>(in.f0, in.f0, in.f1));
+ }
+ }
+ });
+
+ OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform =
+ (OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window.getTransformation();
+ OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator();
+ Assert.assertTrue(operator instanceof WindowOperator);
+ WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
+ Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger);
+ Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingProcessingTimeWindows);
+ Assert.assertTrue(winOperator.getStateDescriptor() instanceof ReducingStateDescriptor);
+
+ processElementAndEnsureOutput(operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+ }
+
+ @Test
+ @SuppressWarnings("rawtypes")
+ public void testReduceWithEvictorAndProcessFunction() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+ DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+ DummyReducer reducer = new DummyReducer();
+
+ DataStream<Tuple2<String, Integer>> window1 = source
+ .windowAll(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
+ .evictor(CountEvictor.of(100))
+ .reduce(
+ reducer,
+ new ProcessAllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() {
+ @Override
+ public void process(
+ Context context,
+ Iterable<Tuple2<String, Integer>> elements,
+ Collector<Tuple2<String, Integer>> out) throws Exception {
+ for (Tuple2<String, Integer> in : elements) {
+ out.collect(in);
+ }
+ }
+ });
+
+ OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
+ OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
+ Assert.assertTrue(operator instanceof EvictingWindowOperator);
+ EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?> winOperator = (EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?>) operator;
+ Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
+ Assert.assertTrue(winOperator.getEvictor() instanceof CountEvictor);
+ Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingEventTimeWindows);
+ Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);
+
+ processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+ }
+
/**
* Test for the deprecated .apply(Reducer, WindowFunction).
*/
@@ -540,6 +654,226 @@ public class AllWindowTranslationTest {
operator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
}
+ @Test
+ public void testAggregateWithEvictor() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+ DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+ DataStream<Tuple2<String, Integer>> window1 = source
+ .windowAll(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
+ .evictor(CountEvictor.of(100))
+ .aggregate(new DummyAggregationFunction());
+
+ OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform =
+ (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
+
+ OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
+
+ Assert.assertTrue(operator instanceof WindowOperator);
+ WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator =
+ (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
+
+ Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
+ Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingEventTimeWindows);
+ Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);
+
+ processElementAndEnsureOutput(
+ winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+ }
+
+ @Test
+ public void testAggregateWithEvictorAndProcessFunction() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+ DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+ DataStream<Tuple2<String, Integer>> window1 = source
+ .windowAll(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
+ .evictor(CountEvictor.of(100))
+ .aggregate(
+ new DummyAggregationFunction(),
+ new ProcessAllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() {
+ @Override
+ public void process(
+ Context context,
+ Iterable<Tuple2<String, Integer>> elements,
+ Collector<Tuple2<String, Integer>> out) throws Exception {
+ for (Tuple2<String, Integer> in : elements) {
+ out.collect(in);
+ }
+ }
+ });
+
+ OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform =
+ (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
+
+ OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
+
+ Assert.assertTrue(operator instanceof WindowOperator);
+ WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator =
+ (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
+
+ Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
+ Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingEventTimeWindows);
+ Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);
+
+ processElementAndEnsureOutput(
+ winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+ }
+
+ // ------------------------------------------------------------------------
+ // process() translation tests
+ // ------------------------------------------------------------------------
+
+ @Test
+ @SuppressWarnings("rawtypes")
+ public void testProcessEventTime() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+ DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+ DataStream<Tuple2<String, Integer>> window1 = source
+ .windowAll(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+ .process(new ProcessAllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void process(
+ Context ctx,
+ Iterable<Tuple2<String, Integer>> values,
+ Collector<Tuple2<String, Integer>> out) throws Exception {
+ for (Tuple2<String, Integer> in : values) {
+ out.collect(in);
+ }
+ }
+ });
+
+ OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
+ OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
+ Assert.assertTrue(operator instanceof WindowOperator);
+ WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
+ Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
+ Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows);
+ Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);
+
+ processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+ }
+
+ @Test
+ @SuppressWarnings("rawtypes")
+ public void testProcessProcessingTime() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+
+ DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+ DataStream<Tuple2<String, Integer>> window1 = source
+ .windowAll(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+ .process(new ProcessAllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void process(
+ Context ctx,
+ Iterable<Tuple2<String, Integer>> values,
+ Collector<Tuple2<String, Integer>> out) throws Exception {
+ for (Tuple2<String, Integer> in : values) {
+ out.collect(in);
+ }
+ }
+ });
+
+ OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
+ OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
+ Assert.assertTrue(operator instanceof WindowOperator);
+ WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
+ Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger);
+ Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingProcessingTimeWindows);
+ Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);
+
+ processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+
+ }
+
+ @Test
+ @SuppressWarnings("rawtypes")
+ public void testProcessWithEvictor() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+ DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+ DataStream<Tuple2<String, Integer>> window1 = source
+ .windowAll(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+ .trigger(CountTrigger.of(1))
+ .evictor(TimeEvictor.of(Time.of(100, TimeUnit.MILLISECONDS)))
+ .process(new ProcessAllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void process(
+ Context ctx,
+ Iterable<Tuple2<String, Integer>> values,
+ Collector<Tuple2<String, Integer>> out) throws Exception {
+ for (Tuple2<String, Integer> in : values) {
+ out.collect(in);
+ }
+ }
+ });
+
+ OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
+ OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
+ Assert.assertTrue(operator instanceof EvictingWindowOperator);
+ EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?> winOperator = (EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?>) operator;
+ Assert.assertTrue(winOperator.getTrigger() instanceof CountTrigger);
+ Assert.assertTrue(winOperator.getEvictor() instanceof TimeEvictor);
+ Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows);
+ Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);
+
+ processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+ }
+
+ @Test
+ @SuppressWarnings("rawtypes")
+ public void testProcessWithCustomTrigger() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+ DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+ DataStream<Tuple2<String, Integer>> window1 = source
+ .windowAll(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+ .trigger(CountTrigger.of(1))
+ .process(new ProcessAllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void process(
+ Context ctx,
+ Iterable<Tuple2<String, Integer>> values,
+ Collector<Tuple2<String, Integer>> out) throws Exception {
+ for (Tuple2<String, Integer> in : values) {
+ out.collect(in);
+ }
+ }
+ });
+
+ OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
+ OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
+ Assert.assertTrue(operator instanceof WindowOperator);
+ WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
+ Assert.assertTrue(winOperator.getTrigger() instanceof CountTrigger);
+ Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows);
+ Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);
+
+ processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+ }
+
+
// ------------------------------------------------------------------------
// fold() translation tests
// ------------------------------------------------------------------------
@@ -666,6 +1000,117 @@ public class AllWindowTranslationTest {
@Test
@SuppressWarnings("rawtypes")
+ public void testFoldWithProcessAllWindowFunctionEventTime() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+ DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+ DataStream<Tuple2<String, Integer>> window = source
+ .windowAll(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+ .fold(new Tuple3<>("", "", 0), new DummyFolder(), new ProcessAllWindowFunction<Tuple3<String, String, Integer>, Tuple2<String, Integer>, TimeWindow>() {
+ private static final long serialVersionUID = 1L;
+ @Override
+ public void process(
+ Context ctx,
+ Iterable<Tuple3<String, String, Integer>> values,
+ Collector<Tuple2<String, Integer>> out) throws Exception {
+ for (Tuple3<String, String, Integer> in : values) {
+ out.collect(new Tuple2<>(in.f0, in.f2));
+ }
+ }
+ });
+
+ OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform =
+ (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window.getTransformation();
+ OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
+ Assert.assertTrue(operator instanceof WindowOperator);
+ WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
+ Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
+ Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingEventTimeWindows);
+ Assert.assertTrue(winOperator.getStateDescriptor() instanceof FoldingStateDescriptor);
+
+ processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+ }
+
+ @Test
+ @SuppressWarnings("rawtypes")
+ public void testFoldWithProcessAllWindowFunctionProcessingTime() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+
+ DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+ DataStream<Tuple2<String, Integer>> window = source
+ .windowAll(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+ .fold(new Tuple3<>("", "empty", 0), new DummyFolder(), new ProcessAllWindowFunction<Tuple3<String, String, Integer>, Tuple2<String, Integer>, TimeWindow>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void process(
+ Context ctx,
+ Iterable<Tuple3<String, String, Integer>> values,
+ Collector<Tuple2<String, Integer>> out) throws Exception {
+ for (Tuple3<String, String, Integer> in : values) {
+ out.collect(new Tuple2<>(in.f0, in.f2));
+ }
+ }
+ });
+
+ OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform =
+ (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window.getTransformation();
+ OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator = transform.getOperator();
+ Assert.assertTrue(operator instanceof WindowOperator);
+ WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?> winOperator = (WindowOperator<String, Tuple2<String, Integer>, ?, ?, ?>) operator;
+ Assert.assertTrue(winOperator.getTrigger() instanceof ProcessingTimeTrigger);
+ Assert.assertTrue(winOperator.getWindowAssigner() instanceof TumblingProcessingTimeWindows);
+ Assert.assertTrue(winOperator.getStateDescriptor() instanceof FoldingStateDescriptor);
+
+ processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+ }
+
+ @Test
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ public void testFoldWithEvictorAndProcessFunction() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+ DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+
+ DataStream<Tuple3<String, String, Integer>> window1 = source
+ .windowAll(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
+ .evictor(CountEvictor.of(100))
+ .fold(
+ new Tuple3<>("", "", 1),
+ new DummyFolder(),
+ new ProcessAllWindowFunction<Tuple3<String, String, Integer>, Tuple3<String, String, Integer>, TimeWindow>() {
+ @Override
+ public void process(
+ Context context,
+ Iterable<Tuple3<String, String, Integer>> elements,
+ Collector<Tuple3<String, String, Integer>> out) throws Exception {
+ for (Tuple3<String, String, Integer> in : elements) {
+ out.collect(in);
+ }
+ }
+ });
+
+ OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>> transform =
+ (OneInputTransformation<Tuple2<String, Integer>, Tuple3<String, String, Integer>>) window1.getTransformation();
+ OneInputStreamOperator<Tuple2<String, Integer>, Tuple3<String, String, Integer>> operator = transform.getOperator();
+ Assert.assertTrue(operator instanceof EvictingWindowOperator);
+ EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?> winOperator = (EvictingWindowOperator<String, Tuple2<String, Integer>, ?, ?>) operator;
+ Assert.assertTrue(winOperator.getTrigger() instanceof EventTimeTrigger);
+ Assert.assertTrue(winOperator.getEvictor() instanceof CountEvictor);
+ Assert.assertTrue(winOperator.getWindowAssigner() instanceof SlidingEventTimeWindows);
+ Assert.assertTrue(winOperator.getStateDescriptor() instanceof ListStateDescriptor);
+
+ winOperator.setOutputType((TypeInformation) window1.getType(), new ExecutionConfig());
+ processElementAndEnsureOutput(winOperator, winOperator.getKeySelector(), BasicTypeInfo.STRING_TYPE_INFO, new Tuple2<>("hello", 1));
+ }
+
+ @Test
+ @SuppressWarnings("rawtypes")
public void testApplyWithPreFolderEventTime() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
http://git-wip-us.apache.org/repos/asf/flink/blob/788b8392/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
index cf062fc..694353c 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
@@ -18,20 +18,19 @@
package org.apache.flink.streaming.api.scala
-import org.apache.flink.annotation.{PublicEvolving, Public}
+import org.apache.flink.annotation.{Public, PublicEvolving}
import org.apache.flink.api.common.functions.{AggregateFunction, FoldFunction, ReduceFunction}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.streaming.api.datastream.{AllWindowedStream => JavaAllWStream}
import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType
import org.apache.flink.streaming.api.functions.aggregation.{ComparableAggregator, SumAggregator}
-import org.apache.flink.streaming.api.scala.function.AllWindowFunction
-import org.apache.flink.streaming.api.scala.function.util.{ScalaAllWindowFunction, ScalaAllWindowFunctionWrapper, ScalaReduceFunction, ScalaFoldFunction}
+import org.apache.flink.streaming.api.scala.function.{AllWindowFunction, ProcessAllWindowFunction}
+import org.apache.flink.streaming.api.scala.function.util.{ScalaAllWindowFunction, ScalaAllWindowFunctionWrapper, ScalaFoldFunction, ScalaProcessAllWindowFunctionWrapper, ScalaReduceFunction}
import org.apache.flink.streaming.api.windowing.evictors.Evictor
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.triggers.Trigger
import org.apache.flink.streaming.api.windowing.windows.Window
import org.apache.flink.util.Collector
-
import org.apache.flink.util.Preconditions.checkNotNull
/**
@@ -199,6 +198,64 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) {
asScalaStream(javaStream.reduce(reducer, applyFunction, returnType))
}
+ /**
+ * Applies the given window function to each window. The window function is called for each
+ * evaluation of the window for each key individually. The output of the window function is
+ * interpreted as a regular non-windowed stream.
+ *
+ * Arriving data is pre-aggregated using the given pre-aggregation reducer.
+ *
+ * @param preAggregator The reduce function that is used for pre-aggregation
+ * @param windowFunction The process window function.
+ * @return The data stream that is the result of applying the window function to the window.
+ */
+ @PublicEvolving
+ def reduce[R: TypeInformation](
+ preAggregator: ReduceFunction[T],
+ windowFunction: ProcessAllWindowFunction[T, R, W]): DataStream[R] = {
+
+ val cleanedReducer = clean(preAggregator)
+ val cleanedWindowFunction = clean(windowFunction)
+
+ val applyFunction = new ScalaProcessAllWindowFunctionWrapper[T, R, W](cleanedWindowFunction)
+
+ val returnType: TypeInformation[R] = implicitly[TypeInformation[R]]
+ asScalaStream(javaStream.reduce(cleanedReducer, applyFunction, returnType))
+ }
+
+ /**
+ * Applies the given window function to each window. The window function is called for each
+ * evaluation of the window for each key individually. The output of the window function is
+ * interpreted as a regular non-windowed stream.
+ *
+ * Arriving data is pre-aggregated using the given pre-aggregation reducer.
+ *
+ * @param preAggregator The reduce function that is used for pre-aggregation
+ * @param windowFunction The process window function.
+ * @return The data stream that is the result of applying the window function to the window.
+ */
+ @PublicEvolving
+ def reduce[R: TypeInformation](
+ preAggregator: (T, T) => T,
+ windowFunction: ProcessAllWindowFunction[T, R, W]): DataStream[R] = {
+
+ if (preAggregator == null) {
+ throw new NullPointerException("Reduce function must not be null.")
+ }
+ if (windowFunction == null) {
+ throw new NullPointerException("WindowApply function must not be null.")
+ }
+
+ val cleanReducer = clean(preAggregator)
+ val cleanWindowFunction = clean(windowFunction)
+
+ val reducer = new ScalaReduceFunction[T](cleanReducer)
+ val applyFunction = new ScalaProcessAllWindowFunctionWrapper[T, R, W](cleanWindowFunction)
+
+ val returnType: TypeInformation[R] = implicitly[TypeInformation[R]]
+ asScalaStream(javaStream.reduce(reducer, applyFunction, returnType))
+ }
+
// --------------------------- aggregate() ----------------------------------
/**
@@ -257,6 +314,39 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) {
/**
* Applies the given window function to each window. The window function is called for each
+ * evaluation of the window for each key individually. The output of the window function is
+ * interpreted as a regular non-windowed stream.
+ *
+ * Arriving data is pre-aggregated using the given aggregation function.
+ *
+ * @param preAggregator The aggregation function that is used for pre-aggregation
+ * @param windowFunction The process window function.
+ * @return The data stream that is the result of applying the window function to the window.
+ */
+ @PublicEvolving
+ def aggregate[ACC: TypeInformation, V: TypeInformation, R: TypeInformation]
+ (preAggregator: AggregateFunction[T, ACC, V],
+ windowFunction: ProcessAllWindowFunction[V, R, W]): DataStream[R] = {
+
+ checkNotNull(preAggregator, "AggregationFunction must not be null")
+ checkNotNull(windowFunction, "Window function must not be null")
+
+ val cleanedPreAggregator = clean(preAggregator)
+ val cleanedWindowFunction = clean(windowFunction)
+
+ val applyFunction = new ScalaProcessAllWindowFunctionWrapper[V, R, W](cleanedWindowFunction)
+
+ val accumulatorType: TypeInformation[ACC] = implicitly[TypeInformation[ACC]]
+ val aggregationResultType: TypeInformation[V] = implicitly[TypeInformation[V]]
+ val resultType: TypeInformation[R] = implicitly[TypeInformation[R]]
+
+ asScalaStream(javaStream.aggregate(
+ cleanedPreAggregator, applyFunction,
+ accumulatorType, aggregationResultType, resultType))
+ }
+
+ /**
+ * Applies the given window function to each window. The window function is called for each
* evaluation of the window. The output of the window function is
* interpreted as a regular non-windowed stream.
*
@@ -367,6 +457,37 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) {
*
* @param initialValue Initial value of the fold
* @param preAggregator The reduce function that is used for pre-aggregation
+ * @param windowFunction The process window function.
+ * @return The data stream that is the result of applying the window function to the window.
+ */
+ @PublicEvolving
+ def fold[ACC: TypeInformation, R: TypeInformation](
+ initialValue: ACC,
+ preAggregator: FoldFunction[T, ACC],
+ windowFunction: ProcessAllWindowFunction[ACC, R, W]): DataStream[R] = {
+
+ val cleanFolder = clean(preAggregator)
+ val cleanWindowFunction = clean(windowFunction)
+
+ val applyFunction = new ScalaProcessAllWindowFunctionWrapper[ACC, R, W](cleanWindowFunction)
+
+ asScalaStream(javaStream.fold(
+ initialValue,
+ cleanFolder,
+ applyFunction,
+ implicitly[TypeInformation[ACC]],
+ implicitly[TypeInformation[R]]))
+ }
+
+ /**
+ * Applies the given window function to each window. The window function is called for each
+ * evaluation of the window for each key individually. The output of the window function is
+ * interpreted as a regular non-windowed stream.
+ *
+ * Arriving data is pre-aggregated using the given pre-aggregation folder.
+ *
+ * @param initialValue Initial value of the fold
+ * @param preAggregator The reduce function that is used for pre-aggregation
* @param windowFunction The window function.
* @return The data stream that is the result of applying the window function to the window.
*/
@@ -393,6 +514,42 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) {
asScalaStream(javaStream.fold(initialValue, folder, applyFunction, accType, returnType))
}
+ /**
+ * Applies the given window function to each window. The window function is called for each
+ * evaluation of the window for each key individually. The output of the window function is
+ * interpreted as a regular non-windowed stream.
+ *
+ * Arriving data is pre-aggregated using the given pre-aggregation folder.
+ *
+ * @param initialValue Initial value of the fold
+ * @param preAggregator The reduce function that is used for pre-aggregation
+ * @param windowFunction The window function.
+ * @return The data stream that is the result of applying the window function to the window.
+ */
+ @PublicEvolving
+ def fold[ACC: TypeInformation, R: TypeInformation](
+ initialValue: ACC,
+ preAggregator: (ACC, T) => ACC,
+ windowFunction: ProcessAllWindowFunction[ACC, R, W]): DataStream[R] = {
+
+ if (preAggregator == null) {
+ throw new NullPointerException("Reduce function must not be null.")
+ }
+ if (windowFunction == null) {
+ throw new NullPointerException("WindowApply function must not be null.")
+ }
+
+ val cleanFolder = clean(preAggregator)
+ val cleanWindowFunction = clean(windowFunction)
+
+ val folder = new ScalaFoldFunction[T, ACC](cleanFolder)
+ val applyFunction = new ScalaProcessAllWindowFunctionWrapper[ACC, R, W](cleanWindowFunction)
+
+ val accType: TypeInformation[ACC] = implicitly[TypeInformation[ACC]]
+ val returnType: TypeInformation[R] = implicitly[TypeInformation[R]]
+ asScalaStream(javaStream.fold(initialValue, folder, applyFunction, accType, returnType))
+ }
+
// ---------------------------- apply() -------------------------------------
/**
@@ -403,6 +560,27 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) {
* Not that this function requires that all data in the windows is buffered until the window
* is evaluated, as the function provides no means of pre-aggregation.
*
+ * @param function The process window function.
+ * @return The data stream that is the result of applying the window function to the window.
+ */
+ @PublicEvolving
+ def process[R: TypeInformation](
+ function: ProcessAllWindowFunction[T, R, W]): DataStream[R] = {
+
+ val cleanedFunction = clean(function)
+ val javaFunction = new ScalaProcessAllWindowFunctionWrapper[T, R, W](cleanedFunction)
+
+ asScalaStream(javaStream.process(javaFunction, implicitly[TypeInformation[R]]))
+ }
+
+ /**
+ * Applies the given window function to each window. The window function is called for each
+ * evaluation of the window for each key individually. The output of the window function is
+ * interpreted as a regular non-windowed stream.
+ *
+ * Not that this function requires that all data in the windows is buffered until the window
+ * is evaluated, as the function provides no means of pre-aggregation.
+ *
* @param function The window function.
* @return The data stream that is the result of applying the window function to the window.
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/788b8392/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessAllWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessAllWindowFunction.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessAllWindowFunction.scala
new file mode 100644
index 0000000..163117b
--- /dev/null
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessAllWindowFunction.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.scala.function
+
+import java.io.Serializable
+
+import org.apache.flink.annotation.PublicEvolving
+import org.apache.flink.api.common.functions.Function
+import org.apache.flink.streaming.api.windowing.windows.Window
+import org.apache.flink.util.Collector
+
+/**
+ * Base abstract class for functions that are evaluated over keyed (grouped)
+ * windows using a context for retrieving extra information.
+ *
+ * @tparam IN The type of the input value.
+ * @tparam OUT The type of the output value.
+ * @tparam W The type of the window.
+ */
+@PublicEvolving
+abstract class ProcessAllWindowFunction[IN, OUT, W <: Window] extends Function with Serializable {
+ /**
+ * Evaluates the window and outputs none or several elements.
+ *
+ * @param context The context in which the window is being evaluated.
+ * @param elements The elements in the window being evaluated.
+ * @param out A collector for emitting elements.
+ * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
+ */
+ @throws[Exception]
+ def process(context: Context, elements: Iterable[IN], out: Collector[OUT])
+
+ /**
+ * The context holding window metadata
+ */
+ abstract class Context {
+ /**
+ * @return The window that is being evaluated.
+ */
+ def window: W
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/788b8392/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/RichProcessAllWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/RichProcessAllWindowFunction.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/RichProcessAllWindowFunction.scala
new file mode 100644
index 0000000..22d64a8
--- /dev/null
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/RichProcessAllWindowFunction.scala
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.scala.function
+
+import java.beans.Transient
+
+import org.apache.flink.annotation.Public
+import org.apache.flink.api.common.functions.{IterationRuntimeContext, RichFunction, RuntimeContext}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.windowing.windows.Window
+
+/**
+ * Base abstract class for functions that are evaluated over
+ * keyed (grouped) windows using a context for retrieving extra information.
+ *
+ * @tparam IN The type of the input value.
+ * @tparam OUT The type of the output value.
+ * @tparam W The type of the window.
+ */
+@Public
+abstract class RichProcessAllWindowFunction[IN, OUT, W <: Window]
+ extends ProcessAllWindowFunction[IN, OUT, W]
+ with RichFunction {
+
+ @Transient
+ private var runtimeContext: RuntimeContext = null
+
+ // --------------------------------------------------------------------------------------------
+ // Runtime context access
+ // --------------------------------------------------------------------------------------------
+
+ override def setRuntimeContext(t: RuntimeContext) {
+ this.runtimeContext = t
+ }
+
+ override def getRuntimeContext: RuntimeContext = {
+ if (this.runtimeContext != null) {
+ this.runtimeContext
+ }
+ else {
+ throw new IllegalStateException("The runtime context has not been initialized.")
+ }
+ }
+
+ override def getIterationRuntimeContext: IterationRuntimeContext = {
+ if (this.runtimeContext == null) {
+ throw new IllegalStateException("The runtime context has not been initialized.")
+ }
+ else {
+ this.runtimeContext match {
+ case iterationRuntimeContext: IterationRuntimeContext => iterationRuntimeContext
+ case _ =>
+ throw new IllegalStateException("This stub is not part of an iteration step function.")
+ }
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Default life cycle methods
+ // --------------------------------------------------------------------------------------------
+
+ @throws[Exception]
+ override def open(parameters: Configuration) {
+ }
+
+ @throws[Exception]
+ override def close() {
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/flink/blob/788b8392/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala
index 23293a6..a4fec64 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala
@@ -18,8 +18,16 @@
package org.apache.flink.streaming.api.scala.function.util
+import org.apache.flink.api.common.functions.RuntimeContext
+import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.windowing.{ProcessWindowFunction => JProcessWindowFunction}
-import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
+import org.apache.flink.streaming.api.functions.windowing.{RichProcessWindowFunction => JRichProcessWindowFunction}
+import org.apache.flink.streaming.api.functions.windowing.{RichProcessAllWindowFunction => JRichProcessAllWindowFunction}
+import org.apache.flink.streaming.api.functions.windowing.{ProcessAllWindowFunction => JProcessAllWindowFunction}
+import org.apache.flink.streaming.api.scala.function.{ProcessWindowFunction => ScalaProcessWindowFunction}
+import org.apache.flink.streaming.api.scala.function.{ProcessAllWindowFunction => ScalaProcessAllWindowFunction}
+import org.apache.flink.streaming.api.scala.function.{RichProcessWindowFunction => ScalaRichProcessWindowFunction}
+import org.apache.flink.streaming.api.scala.function.{RichProcessAllWindowFunction => ScalaRichProcessAllWindowFunction}
import org.apache.flink.streaming.api.windowing.windows.Window
import org.apache.flink.util.Collector
@@ -34,8 +42,8 @@ import scala.collection.JavaConverters._
* - Java WindowFunction: java.lang.Iterable
*/
final class ScalaProcessWindowFunctionWrapper[IN, OUT, KEY, W <: Window](
- private[this] val func: ProcessWindowFunction[IN, OUT, KEY, W])
- extends JProcessWindowFunction[IN, OUT, KEY, W] {
+ private[this] val func: ScalaProcessWindowFunction[IN, OUT, KEY, W])
+ extends JRichProcessWindowFunction[IN, OUT, KEY, W] {
override def process(
key: KEY,
@@ -47,4 +55,75 @@ final class ScalaProcessWindowFunctionWrapper[IN, OUT, KEY, W <: Window](
}
func.process(key, ctx, elements.asScala, out)
}
+
+ override def setRuntimeContext(t: RuntimeContext): Unit = {
+ super.setRuntimeContext(t)
+ func match {
+ case rfunc: ScalaRichProcessWindowFunction[IN, OUT, KEY, W] => rfunc.setRuntimeContext(t)
+ case _ =>
+ }
+ }
+
+ override def open(parameters: Configuration): Unit = {
+ super.open(parameters)
+ func match {
+ case rfunc: ScalaRichProcessWindowFunction[IN, OUT, KEY, W] => rfunc.open(parameters)
+ case _ =>
+ }
+ }
+
+ override def close(): Unit = {
+ super.close()
+ func match {
+ case rfunc: ScalaRichProcessWindowFunction[IN, OUT, KEY, W] => rfunc.close()
+ case _ =>
+ }
+ }
+}
+
+/**
+ * A wrapper function that exposes a Scala ProcessWindowFunction
+ * as a ProcessWindowFunction function.
+ *
+ * The Scala and Java Window functions differ in their type of "Iterable":
+ * - Scala WindowFunction: scala.Iterable
+ * - Java WindowFunction: java.lang.Iterable
+ */
+final class ScalaProcessAllWindowFunctionWrapper[IN, OUT, W <: Window](
+ private[this] val func: ScalaProcessAllWindowFunction[IN, OUT, W])
+ extends JRichProcessAllWindowFunction[IN, OUT, W] {
+
+ override def process(
+ context: JProcessAllWindowFunction[IN, OUT, W]#Context,
+ elements: java.lang.Iterable[IN],
+ out: Collector[OUT]): Unit = {
+ val ctx = new func.Context {
+ override def window = context.window
+ }
+ func.process(ctx, elements.asScala, out)
+ }
+
+ override def setRuntimeContext(t: RuntimeContext): Unit = {
+ super.setRuntimeContext(t)
+ func match {
+ case rfunc : ScalaRichProcessAllWindowFunction[IN, OUT, W] => rfunc.setRuntimeContext(t)
+ case _ =>
+ }
+ }
+
+ override def open(parameters: Configuration): Unit = {
+ super.open(parameters)
+ func match {
+ case rfunc : ScalaRichProcessAllWindowFunction[IN, OUT, W] => rfunc.open(parameters)
+ case _ =>
+ }
+ }
+
+ override def close(): Unit = {
+ super.close()
+ func match {
+ case rfunc : ScalaRichProcessAllWindowFunction[IN, OUT, W] => rfunc.close()
+ case _ =>
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/788b8392/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
index 7e067a0..ee9f50c 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
@@ -26,7 +26,7 @@ import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
import org.apache.flink.api.java.functions.KeySelector
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.operators.OneInputStreamOperator
-import org.apache.flink.streaming.api.scala.function.{WindowFunction, AllWindowFunction}
+import org.apache.flink.streaming.api.scala.function.{AllWindowFunction, ProcessAllWindowFunction, WindowFunction}
import org.apache.flink.streaming.api.transformations.OneInputTransformation
import org.apache.flink.streaming.api.windowing.assigners._
import org.apache.flink.streaming.api.windowing.evictors.CountEvictor
@@ -356,6 +356,85 @@ class AllWindowTranslationTest {
}
@Test
+ def testReduceWithProcessWindowFunctionEventTime() {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+
+ val source = env.fromElements(("hello", 1), ("hello", 2))
+
+ val window1 = source
+ .windowAll(TumblingEventTimeWindows.of(Time.seconds(1)))
+ .reduce(
+ new DummyReducer, new ProcessAllWindowFunction[(String, Int), (String, Int), TimeWindow] {
+ override def process(context: Context,
+ elements: Iterable[(String, Int)],
+ out: Collector[(String, Int)]): Unit = {
+ elements foreach ( x => out.collect(x))
+ }
+ })
+
+ val transform = window1
+ .javaStream
+ .getTransformation
+ .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+ val operator = transform.getOperator
+ assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
+
+ val winOperator = operator
+ .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
+
+ assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger])
+ assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows])
+ assertTrue(winOperator.getStateDescriptor.isInstanceOf[ReducingStateDescriptor[_]])
+
+ processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+ winOperator,
+ winOperator.getKeySelector,
+ BasicTypeInfo.STRING_TYPE_INFO,
+ ("hello", 1))
+ }
+
+ @Test
+ def testReduceWithProcessWindowFunctionProcessingTime() {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
+
+ val source = env.fromElements(("hello", 1), ("hello", 2))
+
+ val window1 = source
+ .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(1)))
+ .reduce(
+ new DummyReducer, new ProcessAllWindowFunction[(String, Int), (String, Int), TimeWindow] {
+ override def process(
+ context: Context,
+ input: Iterable[(String, Int)],
+ out: Collector[(String, Int)]): Unit = input foreach ( x => out.collect(x))
+ })
+
+ val transform = window1
+ .javaStream
+ .getTransformation
+ .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+ val operator = transform.getOperator
+ assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
+
+ val winOperator = operator
+ .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
+
+ assertTrue(winOperator.getTrigger.isInstanceOf[ProcessingTimeTrigger])
+ assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingProcessingTimeWindows])
+ assertTrue(winOperator.getStateDescriptor.isInstanceOf[ReducingStateDescriptor[_]])
+
+ processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+ winOperator,
+ winOperator.getKeySelector,
+ BasicTypeInfo.STRING_TYPE_INFO,
+ ("hello", 1))
+ }
+
+ @Test
def testApplyWithPreReducerEventTime() {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
@@ -566,6 +645,72 @@ class AllWindowTranslationTest {
}
@Test
+ def testAggregateWithProcessWindowFunctionEventTime() {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+
+ val source = env.fromElements(("hello", 1), ("hello", 2))
+
+ val window1 = source
+ .windowAll(TumblingEventTimeWindows.of(Time.seconds(1)))
+ .aggregate(new DummyAggregator(), new TestProcessAllWindowFunction())
+
+ val transform = window1
+ .javaStream
+ .getTransformation
+ .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+ val operator = transform.getOperator
+ assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
+
+ val winOperator = operator
+ .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
+
+ assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger])
+ assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows])
+ assertTrue(winOperator.getStateDescriptor.isInstanceOf[AggregatingStateDescriptor[_, _, _]])
+
+ processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+ winOperator,
+ winOperator.getKeySelector,
+ BasicTypeInfo.STRING_TYPE_INFO,
+ ("hello", 1))
+ }
+
+ @Test
+ def testAggregateWithProcessWindowFunctionProcessingTime() {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
+
+ val source = env.fromElements(("hello", 1), ("hello", 2))
+
+ val window1 = source
+ .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(1)))
+ .aggregate(new DummyAggregator(), new TestProcessAllWindowFunction())
+
+ val transform = window1
+ .javaStream
+ .getTransformation
+ .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+ val operator = transform.getOperator
+ assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
+
+ val winOperator = operator
+ .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
+
+ assertTrue(winOperator.getTrigger.isInstanceOf[ProcessingTimeTrigger])
+ assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingProcessingTimeWindows])
+ assertTrue(winOperator.getStateDescriptor.isInstanceOf[AggregatingStateDescriptor[_, _, _]])
+
+ processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+ winOperator,
+ winOperator.getKeySelector,
+ BasicTypeInfo.STRING_TYPE_INFO,
+ ("hello", 1))
+ }
+
+ @Test
def testAggregateWithWindowFunctionEventTimeWithScalaFunction() {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
@@ -789,6 +934,88 @@ class AllWindowTranslationTest {
}
@Test
+ def testFoldWithProcessWindowFunctionEventTime() {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+
+ val source = env.fromElements(("hello", 1), ("hello", 2))
+
+ val window1 = source
+ .windowAll(TumblingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
+ .fold(
+ ("", "", 1),
+ new DummyFolder,
+ new ProcessAllWindowFunction[(String, String, Int), (String, Int), TimeWindow] {
+ override def process(
+ context: Context,
+ input: Iterable[(String, String, Int)],
+ out: Collector[(String, Int)]): Unit = input foreach {x => out.collect((x._1, x._3))}
+ })
+
+ val transform = window1
+ .javaStream
+ .getTransformation
+ .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+ val operator = transform.getOperator
+ assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
+
+ val winOperator = operator
+ .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
+
+ assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger])
+ assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows])
+ assertTrue(winOperator.getStateDescriptor.isInstanceOf[FoldingStateDescriptor[_, _]])
+
+ processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+ winOperator,
+ winOperator.getKeySelector,
+ BasicTypeInfo.STRING_TYPE_INFO,
+ ("hello", 1))
+ }
+
+ @Test
+ def testFoldWithProcessWindowFunctionProcessingTime() {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
+
+ val source = env.fromElements(("hello", 1), ("hello", 2))
+
+ val window1 = source
+ .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
+ .fold(
+ ("", "", 1),
+ new DummyFolder,
+ new ProcessAllWindowFunction[(String, String, Int), (String, Int), TimeWindow] {
+ override def process(
+ context: Context,
+ input: Iterable[(String, String, Int)],
+ out: Collector[(String, Int)]): Unit = input foreach {x => out.collect((x._1, x._3))}
+ })
+
+ val transform = window1
+ .javaStream
+ .getTransformation
+ .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+ val operator = transform.getOperator
+ assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
+
+ val winOperator = operator
+ .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
+
+ assertTrue(winOperator.getTrigger.isInstanceOf[ProcessingTimeTrigger])
+ assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingProcessingTimeWindows])
+ assertTrue(winOperator.getStateDescriptor.isInstanceOf[FoldingStateDescriptor[_, _]])
+
+ processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+ winOperator,
+ winOperator.getKeySelector,
+ BasicTypeInfo.STRING_TYPE_INFO,
+ ("hello", 1))
+ }
+
+ @Test
def testApplyWithPreFolderEventTime() {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
@@ -951,6 +1178,84 @@ class AllWindowTranslationTest {
}
@Test
+ def testProcessEventTime() {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+
+ val source = env.fromElements(("hello", 1), ("hello", 2))
+
+ val window1 = source
+ .windowAll(TumblingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
+ .process(
+ new ProcessAllWindowFunction[(String, Int), (String, Int), TimeWindow] {
+ override def process(
+ context: Context,
+ input: Iterable[(String, Int)],
+ out: Collector[(String, Int)]): Unit = input foreach {x => out.collect((x._1, x._2))}
+ })
+
+ val transform = window1
+ .javaStream
+ .getTransformation
+ .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+ val operator = transform.getOperator
+ assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
+
+ val winOperator = operator
+ .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
+
+ assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger])
+ assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows])
+ assertTrue(winOperator.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]])
+
+ processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+ winOperator,
+ winOperator.getKeySelector,
+ BasicTypeInfo.STRING_TYPE_INFO,
+ ("hello", 1))
+ }
+
+ @Test
+ def testProcessProcessingTimeTime() {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
+
+ val source = env.fromElements(("hello", 1), ("hello", 2))
+
+ val window1 = source
+ .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
+ .process(
+ new ProcessAllWindowFunction[(String, Int), (String, Int), TimeWindow] {
+ override def process(
+ context: Context,
+ input: Iterable[(String, Int)],
+ out: Collector[(String, Int)]): Unit = input foreach {x => out.collect((x._1, x._2))}
+ })
+
+ val transform = window1
+ .javaStream
+ .getTransformation
+ .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+ val operator = transform.getOperator
+ assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
+
+ val winOperator = operator
+ .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
+
+ assertTrue(winOperator.getTrigger.isInstanceOf[ProcessingTimeTrigger])
+ assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingProcessingTimeWindows])
+ assertTrue(winOperator.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]])
+
+ processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+ winOperator,
+ winOperator.getKeySelector,
+ BasicTypeInfo.STRING_TYPE_INFO,
+ ("hello", 1))
+ }
+
+ @Test
def testApplyEventTimeWithScalaFunction() {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
@@ -1095,6 +1400,46 @@ class AllWindowTranslationTest {
}
@Test
+ def testProcessWithCustomTrigger() {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+
+ val source = env.fromElements(("hello", 1), ("hello", 2))
+
+ val window1 = source
+ .windowAll(TumblingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
+ .trigger(CountTrigger.of(1))
+ .process(
+ new ProcessAllWindowFunction[(String, Int), (String, Int), TimeWindow] {
+ override def process(
+ context: Context,
+ input: Iterable[(String, Int)],
+ out: Collector[(String, Int)]): Unit = input foreach {x => out.collect((x._1, x._2))}
+ })
+
+ val transform = window1
+ .javaStream
+ .getTransformation
+ .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+ val operator = transform.getOperator
+ assertTrue(operator.isInstanceOf[WindowOperator[_, _, _, _, _ <: Window]])
+
+ val winOperator = operator
+ .asInstanceOf[WindowOperator[String, (String, Int), _, (String, Int), _ <: Window]]
+
+ assertTrue(winOperator.getTrigger.isInstanceOf[CountTrigger[_]])
+ assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows])
+ assertTrue(winOperator.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]])
+
+ processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+ winOperator,
+ winOperator.getKeySelector,
+ BasicTypeInfo.STRING_TYPE_INFO,
+ ("hello", 1))
+ }
+
+ @Test
def testReduceWithEvictor() {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
@@ -1211,6 +1556,46 @@ class AllWindowTranslationTest {
("hello", 1))
}
+ @Test
+ def testProcessWithEvictor() {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
+
+ val source = env.fromElements(("hello", 1), ("hello", 2))
+
+ val window1 = source
+ .windowAll(TumblingEventTimeWindows.of(Time.seconds(1), Time.milliseconds(100)))
+ .evictor(CountEvictor.of(100))
+ .process(
+ new ProcessAllWindowFunction[(String, Int), (String, Int), TimeWindow] {
+ override def process(
+ context: Context,
+ input: Iterable[(String, Int)],
+ out: Collector[(String, Int)]): Unit = input foreach {x => out.collect((x._1, x._2))}
+ })
+
+ val transform = window1
+ .javaStream
+ .getTransformation
+ .asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
+
+ val operator = transform.getOperator
+ assertTrue(operator.isInstanceOf[EvictingWindowOperator[_, _, _, _ <: Window]])
+
+ val winOperator = operator
+ .asInstanceOf[EvictingWindowOperator[String, (String, Int), (String, Int), _ <: Window]]
+
+ assertTrue(winOperator.getTrigger.isInstanceOf[EventTimeTrigger])
+ assertTrue(winOperator.getEvictor.isInstanceOf[CountEvictor[_]])
+ assertTrue(winOperator.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows])
+ assertTrue(winOperator.getStateDescriptor.isInstanceOf[ListStateDescriptor[_]])
+
+ processElementAndEnsureOutput[String, (String, Int), (String, Int)](
+ winOperator,
+ winOperator.getKeySelector,
+ BasicTypeInfo.STRING_TYPE_INFO,
+ ("hello", 1))
+ }
/**
* Ensure that we get some output from the given operator when pushing in an element and
@@ -1218,10 +1603,10 @@ class AllWindowTranslationTest {
*/
@throws[Exception]
private def processElementAndEnsureOutput[K, IN, OUT](
- operator: OneInputStreamOperator[IN, OUT],
- keySelector: KeySelector[IN, K],
- keyType: TypeInformation[K],
- element: IN) {
+ operator: OneInputStreamOperator[IN, OUT],
+ keySelector: KeySelector[IN, K],
+ keyType: TypeInformation[K],
+ element: IN) {
val testHarness =
new KeyedOneInputStreamOperatorTestHarness[K, IN, OUT](operator, keySelector, keyType)
@@ -1243,7 +1628,8 @@ class AllWindowTranslationTest {
}
}
-class TestAllWindowFunction extends AllWindowFunction[(String, Int), (String, Int), TimeWindow] {
+class TestAllWindowFunction
+ extends AllWindowFunction[(String, Int), (String, Int), TimeWindow] {
override def apply(
window: TimeWindow,
@@ -1253,3 +1639,15 @@ class TestAllWindowFunction extends AllWindowFunction[(String, Int), (String, In
input.foreach(out.collect)
}
}
+
+class TestProcessAllWindowFunction
+ extends ProcessAllWindowFunction[(String, Int), (String, Int), TimeWindow] {
+
+ override def process(
+ context: Context,
+ input: Iterable[(String, Int)],
+ out: Collector[(String, Int)]): Unit = {
+
+ input.foreach(out.collect)
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/788b8392/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala
index a23145c..dc38758 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala
@@ -26,7 +26,7 @@ import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
import org.apache.flink.streaming.api.functions.sink.SinkFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction
-import org.apache.flink.streaming.api.scala.testutils.{CheckingIdentityRichAllWindowFunction, CheckingIdentityRichProcessWindowFunction, CheckingIdentityRichWindowFunction}
+import org.apache.flink.streaming.api.scala.testutils.{CheckingIdentityRichAllWindowFunction, CheckingIdentityRichProcessAllWindowFunction, CheckingIdentityRichProcessWindowFunction, CheckingIdentityRichWindowFunction}
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
@@ -150,7 +150,6 @@ class WindowFoldITCase extends StreamingMultipleProgramsTestBase {
}
@Test
- @Ignore
def testFoldWithProcessWindowFunction(): Unit = {
WindowFoldITCase.testResults = mutable.MutableList()
CheckingIdentityRichProcessWindowFunction.reset()
@@ -310,6 +309,63 @@ class WindowFoldITCase extends StreamingMultipleProgramsTestBase {
CheckingIdentityRichAllWindowFunction.checkRichMethodCalls()
}
+
+ @Test
+ def testFoldAllWithProcessWindowFunction(): Unit = {
+ WindowFoldITCase.testResults = mutable.MutableList()
+ CheckingIdentityRichProcessAllWindowFunction.reset()
+
+ val foldFunc = new FoldFunction[(String, Int), (String, Int)] {
+ override def fold(accumulator: (String, Int), value: (String, Int)): (String, Int) = {
+ (accumulator._1 + value._1, accumulator._2 + value._2)
+ }
+ }
+
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ env.setParallelism(1)
+
+ val source1 = env.addSource(new SourceFunction[(String, Int)]() {
+ def run(ctx: SourceFunction.SourceContext[(String, Int)]) {
+ ctx.collect(("a", 0))
+ ctx.collect(("a", 1))
+ ctx.collect(("a", 2))
+ ctx.collect(("b", 3))
+ ctx.collect(("a", 3))
+ ctx.collect(("b", 4))
+ ctx.collect(("a", 4))
+ ctx.collect(("b", 5))
+ ctx.collect(("a", 5))
+
+ // source is finite, so it will have an implicit MAX watermark when it finishes
+ }
+
+ def cancel() {
+ }
+ }).assignTimestampsAndWatermarks(new WindowFoldITCase.Tuple2TimestampExtractor)
+
+ source1
+ .windowAll(TumblingEventTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
+ .fold(
+ ("R:", 0),
+ foldFunc,
+ new CheckingIdentityRichProcessAllWindowFunction[(String, Int), TimeWindow]())
+ .addSink(new SinkFunction[(String, Int)]() {
+ def invoke(value: (String, Int)) {
+ WindowFoldITCase.testResults += value.toString
+ }
+ })
+
+ env.execute("Fold All-Window Test")
+
+ val expectedResult = mutable.MutableList(
+ "(R:aaa,3)",
+ "(R:bababa,24)")
+
+ assertEquals(expectedResult.sorted, WindowFoldITCase.testResults.sorted)
+
+ CheckingIdentityRichProcessAllWindowFunction.checkRichMethodCalls()
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/788b8392/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFunctionITCase.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFunctionITCase.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFunctionITCase.scala
index bfbe6ee..eb9f361 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFunctionITCase.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFunctionITCase.scala
@@ -25,7 +25,7 @@ import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
import org.apache.flink.streaming.api.functions.sink.SinkFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction
-import org.apache.flink.streaming.api.scala.testutils.{CheckingIdentityRichAllWindowFunction, CheckingIdentityRichProcessWindowFunction, CheckingIdentityRichWindowFunction}
+import org.apache.flink.streaming.api.scala.testutils.{CheckingIdentityRichAllWindowFunction, CheckingIdentityRichProcessAllWindowFunction, CheckingIdentityRichProcessWindowFunction, CheckingIdentityRichWindowFunction}
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
@@ -87,7 +87,6 @@ class WindowFunctionITCase {
}
@Test
- @Ignore
def testRichProcessWindowFunction(): Unit = {
WindowFunctionITCase.testResults = mutable.MutableList()
CheckingIdentityRichProcessWindowFunction.reset()
@@ -183,6 +182,54 @@ class WindowFunctionITCase {
CheckingIdentityRichAllWindowFunction.checkRichMethodCalls()
}
+
+ @Test
+ def testRichProcessAllWindowFunction(): Unit = {
+ WindowFunctionITCase.testResults = mutable.MutableList()
+ CheckingIdentityRichProcessAllWindowFunction.reset()
+
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ env.setParallelism(1)
+
+ val source1 = env.addSource(new SourceFunction[(String, Int)]() {
+ def run(ctx: SourceFunction.SourceContext[(String, Int)]) {
+ ctx.collect(("a", 0))
+ ctx.collect(("a", 1))
+ ctx.collect(("a", 2))
+ ctx.collect(("b", 3))
+ ctx.collect(("b", 4))
+ ctx.collect(("b", 5))
+ ctx.collect(("a", 6))
+ ctx.collect(("a", 7))
+ ctx.collect(("a", 8))
+
+ // source is finite, so it will have an implicit MAX watermark when it finishes
+ }
+
+ def cancel() {}
+
+ }).assignTimestampsAndWatermarks(new WindowFunctionITCase.Tuple2TimestampExtractor)
+
+ source1
+ .windowAll(TumblingEventTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
+ .process(new CheckingIdentityRichProcessAllWindowFunction[(String, Int), TimeWindow]())
+ .addSink(new SinkFunction[(String, Int)]() {
+ def invoke(value: (String, Int)) {
+ WindowFunctionITCase.testResults += value.toString
+ }
+ })
+
+ env.execute("RichAllWindowFunction Test")
+
+ val expectedResult = mutable.MutableList(
+ "(a,0)", "(a,1)", "(a,2)", "(a,6)", "(a,7)", "(a,8)",
+ "(b,3)", "(b,4)", "(b,5)")
+
+ assertEquals(expectedResult.sorted, WindowFunctionITCase.testResults.sorted)
+
+ CheckingIdentityRichProcessAllWindowFunction.checkRichMethodCalls()
+ }
}
object WindowFunctionITCase {
http://git-wip-us.apache.org/repos/asf/flink/blob/788b8392/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowReduceITCase.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowReduceITCase.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowReduceITCase.scala
index 5418108..ee1dbfd 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowReduceITCase.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowReduceITCase.scala
@@ -26,7 +26,7 @@ import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
import org.apache.flink.streaming.api.functions.sink.SinkFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction
-import org.apache.flink.streaming.api.scala.testutils.{CheckingIdentityRichAllWindowFunction, CheckingIdentityRichProcessWindowFunction, CheckingIdentityRichWindowFunction}
+import org.apache.flink.streaming.api.scala.testutils.{CheckingIdentityRichAllWindowFunction, CheckingIdentityRichProcessAllWindowFunction, CheckingIdentityRichProcessWindowFunction, CheckingIdentityRichWindowFunction}
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
@@ -149,7 +149,6 @@ class WindowReduceITCase extends StreamingMultipleProgramsTestBase {
}
@Test
- @Ignore
def testReduceWithProcessWindowFunction(): Unit = {
WindowReduceITCase.testResults = mutable.MutableList()
CheckingIdentityRichProcessWindowFunction.reset()
@@ -307,6 +306,62 @@ class WindowReduceITCase extends StreamingMultipleProgramsTestBase {
CheckingIdentityRichAllWindowFunction.checkRichMethodCalls()
}
+
+ @Test
+ def testReduceAllWithProcessWindowFunction(): Unit = {
+ WindowReduceITCase.testResults = mutable.MutableList()
+ CheckingIdentityRichProcessAllWindowFunction.reset()
+
+ val reduceFunc = new ReduceFunction[(String, Int)] {
+ override def reduce(a: (String, Int), b: (String, Int)): (String, Int) = {
+ (a._1 + b._1, a._2 + b._2)
+ }
+ }
+
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ env.setParallelism(1)
+
+ val source1 = env.addSource(new SourceFunction[(String, Int)]() {
+ def run(ctx: SourceFunction.SourceContext[(String, Int)]) {
+ ctx.collect(("a", 0))
+ ctx.collect(("a", 1))
+ ctx.collect(("a", 2))
+ ctx.collect(("b", 3))
+ ctx.collect(("a", 3))
+ ctx.collect(("b", 4))
+ ctx.collect(("a", 4))
+ ctx.collect(("b", 5))
+ ctx.collect(("a", 5))
+
+ // source is finite, so it will have an implicit MAX watermark when it finishes
+ }
+
+ def cancel() {
+ }
+ }).assignTimestampsAndWatermarks(new WindowReduceITCase.Tuple2TimestampExtractor)
+
+ source1
+ .windowAll(TumblingEventTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
+ .reduce(
+ reduceFunc,
+ new CheckingIdentityRichProcessAllWindowFunction[(String, Int), TimeWindow]())
+ .addSink(new SinkFunction[(String, Int)]() {
+ def invoke(value: (String, Int)) {
+ WindowReduceITCase.testResults += value.toString
+ }
+ })
+
+ env.execute("Fold All-Window Test")
+
+ val expectedResult = mutable.MutableList(
+ "(aaa,3)",
+ "(bababa,24)")
+
+ assertEquals(expectedResult.sorted, WindowReduceITCase.testResults.sorted)
+
+ CheckingIdentityRichProcessAllWindowFunction.checkRichMethodCalls()
+ }
}
object WindowReduceITCase {
http://git-wip-us.apache.org/repos/asf/flink/blob/788b8392/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/testutils/CheckingIdentityRichProcessAllWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/testutils/CheckingIdentityRichProcessAllWindowFunction.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/testutils/CheckingIdentityRichProcessAllWindowFunction.scala
new file mode 100644
index 0000000..df005fa
--- /dev/null
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/testutils/CheckingIdentityRichProcessAllWindowFunction.scala
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.scala.testutils
+
+import org.apache.flink.api.common.functions.RuntimeContext
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.scala.function.RichProcessAllWindowFunction
+import org.apache.flink.streaming.api.windowing.windows.Window
+import org.apache.flink.util.Collector
+
+
+class CheckingIdentityRichProcessAllWindowFunction[T, W <: Window]
+ extends RichProcessAllWindowFunction[T, T, W] {
+
+ override def process(context: Context, input: Iterable[T], out: Collector[T]): Unit = {
+ for (value <- input) {
+ out.collect(value)
+ }
+ }
+
+ override def open(conf: Configuration): Unit = {
+ super.open(conf)
+ CheckingIdentityRichProcessAllWindowFunction.openCalled = true
+ }
+
+ override def close(): Unit = {
+ super.close()
+ CheckingIdentityRichProcessAllWindowFunction.closeCalled = true
+ }
+
+ override def setRuntimeContext(context: RuntimeContext): Unit = {
+ super.setRuntimeContext(context)
+ CheckingIdentityRichProcessAllWindowFunction.contextSet = true
+ }
+}
+
+object CheckingIdentityRichProcessAllWindowFunction {
+
+ @volatile
+ private[CheckingIdentityRichProcessAllWindowFunction] var closeCalled = false
+
+ @volatile
+ private[CheckingIdentityRichProcessAllWindowFunction] var openCalled = false
+
+ @volatile
+ private[CheckingIdentityRichProcessAllWindowFunction] var contextSet = false
+
+ def reset(): Unit = {
+ closeCalled = false
+ openCalled = false
+ contextSet = false
+ }
+
+ def checkRichMethodCalls(): Unit = {
+ if (!contextSet) {
+ throw new AssertionError("context not set")
+ }
+ if (!openCalled) {
+ throw new AssertionError("open() not called")
+ }
+ if (!closeCalled) {
+ throw new AssertionError("close() not called")
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/788b8392/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/WindowFoldITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/WindowFoldITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/WindowFoldITCase.java
index 7d37d1a..903179d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/WindowFoldITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/WindowFoldITCase.java
@@ -27,6 +27,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
@@ -253,6 +254,78 @@ public class WindowFoldITCase extends StreamingMultipleProgramsTestBase {
Assert.assertEquals(expectedResult, testResults);
}
+ @Test
+ public void testFoldProcessAllWindow() throws Exception {
+
+ testResults = new ArrayList<>();
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+ env.setParallelism(1);
+
+ DataStream<Tuple2<String, Integer>> source1 = env.addSource(new SourceFunction<Tuple2<String, Integer>>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {
+ ctx.collect(Tuple2.of("a", 0));
+ ctx.collect(Tuple2.of("a", 1));
+ ctx.collect(Tuple2.of("a", 2));
+
+ ctx.collect(Tuple2.of("b", 3));
+ ctx.collect(Tuple2.of("b", 4));
+ ctx.collect(Tuple2.of("b", 5));
+
+ ctx.collect(Tuple2.of("a", 6));
+ ctx.collect(Tuple2.of("a", 7));
+ ctx.collect(Tuple2.of("a", 8));
+
+ // source is finite, so it will have an implicit MAX watermark when it finishes
+ }
+
+ @Override
+ public void cancel() {}
+
+ }).assignTimestampsAndWatermarks(new Tuple2TimestampExtractor());
+
+ source1
+ .windowAll(TumblingEventTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
+ .fold(Tuple2.of(0, "R:"), new FoldFunction<Tuple2<String, Integer>, Tuple2<Integer, String>>() {
+ @Override
+ public Tuple2<Integer, String> fold(Tuple2<Integer, String> accumulator, Tuple2<String, Integer> value) throws Exception {
+ accumulator.f1 += value.f0;
+ accumulator.f0 += value.f1;
+ return accumulator;
+ }
+ }, new ProcessAllWindowFunction<Tuple2<Integer, String>, Tuple3<String, Integer, Integer>, TimeWindow>() {
+ @Override
+ public void process(Context context, Iterable<Tuple2<Integer, String>> elements, Collector<Tuple3<String, Integer, Integer>> out) throws Exception {
+ int i = 0;
+ for (Tuple2<Integer, String> in : elements) {
+ out.collect(new Tuple3<>(in.f1, in.f0, i++));
+ }
+ }
+ })
+ .addSink(new SinkFunction<Tuple3<String, Integer, Integer>>() {
+ @Override
+ public void invoke(Tuple3<String, Integer, Integer> value) throws Exception {
+ testResults.add(value.toString());
+ }
+ });
+
+ env.execute("Fold Process Window Test");
+
+ List<String> expectedResult = Arrays.asList(
+ "(R:aaa,3,0)",
+ "(R:aaa,21,0)",
+ "(R:bbb,12,0)");
+
+ Collections.sort(expectedResult);
+ Collections.sort(testResults);
+
+ Assert.assertEquals(expectedResult, testResults);
+ }
+
private static class Tuple2TimestampExtractor implements AssignerWithPunctuatedWatermarks<Tuple2<String, Integer>> {
@Override
[2/2] flink git commit: [FLINK-5157] [streaming] Introduce
ProcessAllWindowFunction
Posted by al...@apache.org.
[FLINK-5157] [streaming] Introduce ProcessAllWindowFunction
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/788b8392
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/788b8392
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/788b8392
Branch: refs/heads/master
Commit: 788b839213811c6f2407ac6d54fef28dfa3d29a6
Parents: 87b9077
Author: Ventura Del Monte <ve...@gmail.com>
Authored: Wed Feb 22 14:55:17 2017 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Tue Feb 28 14:02:56 2017 +0100
----------------------------------------------------------------------
.../api/datastream/AllWindowedStream.java | 398 ++++++++++++++++-
.../FoldApplyProcessAllWindowFunction.java | 120 +++++
.../windowing/ProcessAllWindowFunction.java | 59 +++
.../ReduceApplyProcessAllWindowFunction.java | 80 ++++
.../windowing/RichProcessAllWindowFunction.java | 84 ++++
...ternalAggregateProcessAllWindowFunction.java | 83 ++++
...nternalIterableProcessAllWindowFunction.java | 63 +++
...rnalSingleValueProcessAllWindowFunction.java | 65 +++
...nternalSingleValueProcessWindowFunction.java | 3 +-
.../FoldApplyProcessWindowFunctionTest.java | 99 +++++
.../operators/StateDescriptorPassingTest.java | 19 +
.../functions/InternalWindowFunctionTest.java | 193 +++++++-
.../windowing/AllWindowTranslationTest.java | 445 +++++++++++++++++++
.../streaming/api/scala/AllWindowedStream.scala | 186 +++++++-
.../function/ProcessAllWindowFunction.scala | 59 +++
.../function/RichProcessAllWindowFunction.scala | 86 ++++
.../ScalaProcessWindowFunctionWrapper.scala | 85 +++-
.../api/scala/AllWindowTranslationTest.scala | 410 ++++++++++++++++-
.../streaming/api/scala/WindowFoldITCase.scala | 60 ++-
.../api/scala/WindowFunctionITCase.scala | 51 ++-
.../api/scala/WindowReduceITCase.scala | 59 ++-
...ngIdentityRichProcessAllWindowFunction.scala | 81 ++++
.../streaming/runtime/WindowFoldITCase.java | 73 +++
23 files changed, 2830 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/788b8392/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
index 742a2ed..a45cb0a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
@@ -40,9 +40,12 @@ import org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator
import org.apache.flink.streaming.api.functions.aggregation.SumAggregator;
import org.apache.flink.streaming.api.functions.windowing.AggregateApplyAllWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.FoldApplyProcessAllWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.PassThroughAllWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.FoldApplyAllWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.ReduceApplyAllWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.ReduceApplyProcessAllWindowFunction;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
@@ -52,8 +55,12 @@ import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator;
import org.apache.flink.streaming.runtime.operators.windowing.WindowOperator;
+import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalAggregateProcessAllWindowFunction;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableAllWindowFunction;
+import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessAllWindowFunction;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueAllWindowFunction;
+import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueProcessAllWindowFunction;
+import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction;
import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -287,6 +294,102 @@ public class AllWindowedStream<T, W extends Window> {
return input.transform(opName, resultType, operator).forceNonParallel();
}
+ /**
+ * Applies the given window function to each window. The window function is called for each
+ * evaluation of the window for each key individually. The output of the window function is
+ * interpreted as a regular non-windowed stream.
+ *
+ * <p>
+ * Arriving data is incrementally aggregated using the given reducer.
+ *
+ * @param reduceFunction The reduce function that is used for incremental aggregation.
+ * @param function The process window function.
+ * @return The data stream that is the result of applying the window function to the window.
+ */
+ @PublicEvolving
+ public <R> SingleOutputStreamOperator<R> reduce(
+ ReduceFunction<T> reduceFunction,
+ ProcessAllWindowFunction<T, R, W> function) {
+
+ TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType(
+ function, ProcessAllWindowFunction.class, true, true, input.getType(), null, false);
+
+ return reduce(reduceFunction, function, resultType);
+ }
+
+ /**
+ * Applies the given window function to each window. The window function is called for each
+ * evaluation of the window for each key individually. The output of the window function is
+ * interpreted as a regular non-windowed stream.
+ *
+ * <p>
+ * Arriving data is incrementally aggregated using the given reducer.
+ *
+ * @param reduceFunction The reduce function that is used for incremental aggregation.
+ * @param function The process window function.
+ * @param resultType Type information for the result type of the window function
+ * @return The data stream that is the result of applying the window function to the window.
+ */
+ @PublicEvolving
+ public <R> SingleOutputStreamOperator<R> reduce(ReduceFunction<T> reduceFunction, ProcessAllWindowFunction<T, R, W> function, TypeInformation<R> resultType) {
+ if (reduceFunction instanceof RichFunction) {
+ throw new UnsupportedOperationException("ReduceFunction of reduce can not be a RichFunction.");
+ }
+
+ //clean the closures
+ function = input.getExecutionEnvironment().clean(function);
+ reduceFunction = input.getExecutionEnvironment().clean(reduceFunction);
+
+ String callLocation = Utils.getCallLocationName();
+ String udfName = "AllWindowedStream." + callLocation;
+
+ String opName;
+ KeySelector<T, Byte> keySel = input.getKeySelector();
+
+ OneInputStreamOperator<T, R> operator;
+
+ if (evictor != null) {
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ TypeSerializer<StreamRecord<T>> streamRecordSerializer =
+ (TypeSerializer<StreamRecord<T>>) new StreamElementSerializer(input.getType().createSerializer(getExecutionEnvironment().getConfig()));
+
+ ListStateDescriptor<StreamRecord<T>> stateDesc =
+ new ListStateDescriptor<>("window-contents", streamRecordSerializer);
+
+ opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + evictor + ", " + udfName + ")";
+
+ operator =
+ new EvictingWindowOperator<>(windowAssigner,
+ windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
+ keySel,
+ input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
+ stateDesc,
+ new InternalIterableProcessAllWindowFunction<>(new ReduceApplyProcessAllWindowFunction<>(reduceFunction, function)),
+ trigger,
+ evictor,
+ allowedLateness);
+
+ } else {
+ ReducingStateDescriptor<T> stateDesc = new ReducingStateDescriptor<>("window-contents",
+ reduceFunction,
+ input.getType().createSerializer(getExecutionEnvironment().getConfig()));
+
+ opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + udfName + ")";
+
+ operator =
+ new WindowOperator<>(windowAssigner,
+ windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
+ keySel,
+ input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
+ stateDesc,
+ new InternalSingleValueProcessAllWindowFunction<>(function),
+ trigger,
+ allowedLateness);
+ }
+
+ return input.transform(opName, resultType, operator).forceNonParallel();
+ }
+
// ------------------------------------------------------------------------
// AggregateFunction
// ------------------------------------------------------------------------
@@ -483,6 +586,137 @@ public class AllWindowedStream<T, W extends Window> {
return input.transform(opName, resultType, operator).forceNonParallel();
}
+ /**
+ * Applies the given window function to each window. The window function is called for each
+ * evaluation of the window for each key individually. The output of the window function is
+ * interpreted as a regular non-windowed stream.
+ *
+ * <p>Arriving data is incrementally aggregated using the given aggregate function. This means
+ * that the window function typically has only a single value to process when called.
+ *
+ * @param aggFunction The aggregate function that is used for incremental aggregation.
+ * @param windowFunction The process window function.
+ *
+ * @return The data stream that is the result of applying the window function to the window.
+ *
+ * @param <ACC> The type of the AggregateFunction's accumulator
+ * @param <V> The type of AggregateFunction's result, and the WindowFunction's input
+ * @param <R> The type of the elements in the resulting stream, equal to the
+ * WindowFunction's result type
+ */
+ @PublicEvolving
+ public <ACC, V, R> SingleOutputStreamOperator<R> aggregate(
+ AggregateFunction<T, ACC, V> aggFunction,
+ ProcessAllWindowFunction<V, R, W> windowFunction) {
+
+ checkNotNull(aggFunction, "aggFunction");
+ checkNotNull(windowFunction, "windowFunction");
+
+ TypeInformation<ACC> accumulatorType = TypeExtractor.getAggregateFunctionAccumulatorType(
+ aggFunction, input.getType(), null, false);
+
+ TypeInformation<V> aggResultType = TypeExtractor.getAggregateFunctionReturnType(
+ aggFunction, input.getType(), null, false);
+
+ TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType(
+ windowFunction, ProcessAllWindowFunction.class, true, true, aggResultType, null, false);
+
+ return aggregate(aggFunction, windowFunction, accumulatorType, aggResultType, resultType);
+ }
+
+ /**
+ * Applies the given window function to each window. The window function is called for each
+ * evaluation of the window for each key individually. The output of the window function is
+ * interpreted as a regular non-windowed stream.
+ *
+ * <p>Arriving data is incrementally aggregated using the given aggregate function. This means
+ * that the window function typically has only a single value to process when called.
+ *
+ * @param aggregateFunction The aggregation function that is used for incremental aggregation.
+ * @param windowFunction The process window function.
+ * @param accumulatorType Type information for the internal accumulator type of the aggregation function
+ * @param resultType Type information for the result type of the window function
+ *
+ * @return The data stream that is the result of applying the window function to the window.
+ *
+ * @param <ACC> The type of the AggregateFunction's accumulator
+ * @param <V> The type of AggregateFunction's result, and the WindowFunction's input
+ * @param <R> The type of the elements in the resulting stream, equal to the
+ * WindowFunction's result type
+ */
+ @PublicEvolving
+ public <ACC, V, R> SingleOutputStreamOperator<R> aggregate(
+ AggregateFunction<T, ACC, V> aggregateFunction,
+ ProcessAllWindowFunction<V, R, W> windowFunction,
+ TypeInformation<ACC> accumulatorType,
+ TypeInformation<V> aggregateResultType,
+ TypeInformation<R> resultType) {
+
+ checkNotNull(aggregateFunction, "aggregateFunction");
+ checkNotNull(windowFunction, "windowFunction");
+ checkNotNull(accumulatorType, "accumulatorType");
+ checkNotNull(aggregateResultType, "aggregateResultType");
+ checkNotNull(resultType, "resultType");
+
+ if (aggregateFunction instanceof RichFunction) {
+ throw new UnsupportedOperationException("This aggregate function cannot be a RichFunction.");
+ }
+
+ //clean the closures
+ windowFunction = input.getExecutionEnvironment().clean(windowFunction);
+ aggregateFunction = input.getExecutionEnvironment().clean(aggregateFunction);
+
+ final String callLocation = Utils.getCallLocationName();
+ final String udfName = "AllWindowedStream." + callLocation;
+
+ final String opName;
+ final KeySelector<T, Byte> keySel = input.getKeySelector();
+
+ OneInputStreamOperator<T, R> operator;
+
+ if (evictor != null) {
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ TypeSerializer<StreamRecord<T>> streamRecordSerializer =
+ (TypeSerializer<StreamRecord<T>>) new StreamElementSerializer(
+ input.getType().createSerializer(getExecutionEnvironment().getConfig()));
+
+ ListStateDescriptor<StreamRecord<T>> stateDesc =
+ new ListStateDescriptor<>("window-contents", streamRecordSerializer);
+
+ opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + evictor + ", " + udfName + ")";
+
+ operator = new EvictingWindowOperator<>(windowAssigner,
+ windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
+ keySel,
+ input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
+ stateDesc,
+ new InternalAggregateProcessAllWindowFunction<>(aggregateFunction, windowFunction),
+ trigger,
+ evictor,
+ allowedLateness);
+
+ } else {
+ AggregatingStateDescriptor<T, ACC, V> stateDesc = new AggregatingStateDescriptor<>(
+ "window-contents",
+ aggregateFunction,
+ accumulatorType.createSerializer(getExecutionEnvironment().getConfig()));
+
+ opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + udfName + ")";
+
+ operator = new WindowOperator<>(
+ windowAssigner,
+ windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
+ keySel,
+ input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
+ stateDesc,
+ new InternalSingleValueProcessAllWindowFunction<>(windowFunction),
+ trigger,
+ allowedLateness);
+ }
+
+ return input.transform(opName, resultType, operator).forceNonParallel();
+ }
+
// ------------------------------------------------------------------------
// FoldFunction
// ------------------------------------------------------------------------
@@ -630,13 +864,119 @@ public class AllWindowedStream<T, W extends Window> {
return input.transform(opName, resultType, operator).forceNonParallel();
}
+ /**
+ * Applies the given window function to each window. The window function is called for each
+ * evaluation of the window for each key individually. The output of the window function is
+ * interpreted as a regular non-windowed stream.
+ *
+ * <p>
+ * Arriving data is incrementally aggregated using the given fold function.
+ *
+ * @param initialValue The initial value of the fold.
+ * @param foldFunction The fold function that is used for incremental aggregation.
+ * @param function The window function.
+ * @return The data stream that is the result of applying the window function to the window.
+ */
+ @PublicEvolving
+ public <ACC, R> SingleOutputStreamOperator<R> fold(ACC initialValue, FoldFunction<T, ACC> foldFunction, ProcessAllWindowFunction<ACC, R, W> function) {
+
+ TypeInformation<ACC> foldAccumulatorType = TypeExtractor.getFoldReturnTypes(foldFunction, input.getType(),
+ Utils.getCallLocationName(), true);
+
+ TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType(
+ function, ProcessAllWindowFunction.class, true, true, foldAccumulatorType, null, false);
+
+ return fold(initialValue, foldFunction, function, foldAccumulatorType, resultType);
+ }
+
+ /**
+ * Applies the given window function to each window. The window function is called for each
+ * evaluation of the window for each key individually. The output of the window function is
+ * interpreted as a regular non-windowed stream.
+ *
+ * <p>
+ * Arriving data is incrementally aggregated using the given fold function.
+ *
+ * @param initialValue The initial value of the fold.
+ * @param foldFunction The fold function that is used for incremental aggregation.
+ * @param function The process window function.
+ * @param foldAccumulatorType Type information for the result type of the fold function
+ * @param resultType Type information for the result type of the window function
+ * @return The data stream that is the result of applying the window function to the window.
+ */
+ @PublicEvolving
+ public <ACC, R> SingleOutputStreamOperator<R> fold(ACC initialValue,
+ FoldFunction<T, ACC> foldFunction,
+ ProcessAllWindowFunction<ACC, R, W> function,
+ TypeInformation<ACC> foldAccumulatorType,
+ TypeInformation<R> resultType) {
+ if (foldFunction instanceof RichFunction) {
+ throw new UnsupportedOperationException("FoldFunction of fold can not be a RichFunction.");
+ }
+ if (windowAssigner instanceof MergingWindowAssigner) {
+ throw new UnsupportedOperationException("Fold cannot be used with a merging WindowAssigner.");
+ }
+
+ //clean the closures
+ function = input.getExecutionEnvironment().clean(function);
+ foldFunction = input.getExecutionEnvironment().clean(foldFunction);
+
+ String callLocation = Utils.getCallLocationName();
+ String udfName = "AllWindowedStream." + callLocation;
+
+ String opName;
+ KeySelector<T, Byte> keySel = input.getKeySelector();
+
+ OneInputStreamOperator<T, R> operator;
+
+ if (evictor != null) {
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ TypeSerializer<StreamRecord<T>> streamRecordSerializer =
+ (TypeSerializer<StreamRecord<T>>) new StreamElementSerializer(input.getType().createSerializer(getExecutionEnvironment().getConfig()));
+
+ ListStateDescriptor<StreamRecord<T>> stateDesc =
+ new ListStateDescriptor<>("window-contents", streamRecordSerializer);
+
+ opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + evictor + ", " + udfName + ")";
+
+ operator =
+ new EvictingWindowOperator<>(windowAssigner,
+ windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
+ keySel,
+ input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
+ stateDesc,
+ new InternalIterableProcessAllWindowFunction<>(new FoldApplyProcessAllWindowFunction<>(initialValue, foldFunction, function, foldAccumulatorType)),
+ trigger,
+ evictor,
+ allowedLateness);
+
+ } else {
+ FoldingStateDescriptor<T, ACC> stateDesc = new FoldingStateDescriptor<>("window-contents",
+ initialValue, foldFunction, foldAccumulatorType.createSerializer(getExecutionEnvironment().getConfig()));
+
+ opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + udfName + ")";
+
+ operator =
+ new WindowOperator<>(windowAssigner,
+ windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
+ keySel,
+ input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
+ stateDesc,
+ new InternalSingleValueProcessAllWindowFunction<>(function),
+ trigger,
+ allowedLateness);
+ }
+
+ return input.transform(opName, resultType, operator).forceNonParallel();
+ }
+
// ------------------------------------------------------------------------
// Apply (Window Function)
// ------------------------------------------------------------------------
/**
* Applies the given window function to each window. The window function is called for each
- * evaluation of the window for each key individually. The output of the window function is
+ * evaluation of the window. The output of the window function is
* interpreted as a regular non-windowed stream.
*
* <p>
@@ -647,15 +987,16 @@ public class AllWindowedStream<T, W extends Window> {
* @return The data stream that is the result of applying the window function to the window.
*/
public <R> SingleOutputStreamOperator<R> apply(AllWindowFunction<T, R, W> function) {
+ String callLocation = Utils.getCallLocationName();
+ function = input.getExecutionEnvironment().clean(function);
TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType(
function, AllWindowFunction.class, true, true, getInputType(), null, false);
-
- return apply(function, resultType);
+ return apply(new InternalIterableAllWindowFunction<>(function), resultType, callLocation);
}
/**
* Applies the given window function to each window. The window function is called for each
- * evaluation of the window for each key individually. The output of the window function is
+ * evaluation of the window. The output of the window function is
* interpreted as a regular non-windowed stream.
*
* <p>
@@ -663,15 +1004,56 @@ public class AllWindowedStream<T, W extends Window> {
* is evaluated, as the function provides no means of incremental aggregation.
*
* @param function The window function.
- * @param resultType Type information for the result type of the window function
* @return The data stream that is the result of applying the window function to the window.
*/
public <R> SingleOutputStreamOperator<R> apply(AllWindowFunction<T, R, W> function, TypeInformation<R> resultType) {
+ String callLocation = Utils.getCallLocationName();
+ function = input.getExecutionEnvironment().clean(function);
+ return apply(new InternalIterableAllWindowFunction<>(function), resultType, callLocation);
+ }
- //clean the closure
+ /**
+ * Applies the given window function to each window. The window function is called for each
+ * evaluation of the window. The output of the window function is
+ * interpreted as a regular non-windowed stream.
+ *
+ * <p>
+ * Not that this function requires that all data in the windows is buffered until the window
+ * is evaluated, as the function provides no means of incremental aggregation.
+ *
+ * @param function The process window function.
+ * @return The data stream that is the result of applying the window function to the window.
+ */
+ @PublicEvolving
+ public <R> SingleOutputStreamOperator<R> process(ProcessAllWindowFunction<T, R, W> function) {
+ String callLocation = Utils.getCallLocationName();
function = input.getExecutionEnvironment().clean(function);
+ TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType(
+ function, ProcessAllWindowFunction.class, true, true, getInputType(), null, false);
+ return apply(new InternalIterableProcessAllWindowFunction<>(function), resultType, callLocation);
+ }
+ /**
+ * Applies the given window function to each window. The window function is called for each
+ * evaluation of the window. The output of the window function is
+ * interpreted as a regular non-windowed stream.
+ *
+ * <p>
+ * Not that this function requires that all data in the windows is buffered until the window
+ * is evaluated, as the function provides no means of incremental aggregation.
+ *
+ * @param function The process window function.
+ * @return The data stream that is the result of applying the window function to the window.
+ */
+ @PublicEvolving
+ public <R> SingleOutputStreamOperator<R> process(ProcessAllWindowFunction<T, R, W> function, TypeInformation<R> resultType) {
String callLocation = Utils.getCallLocationName();
+ function = input.getExecutionEnvironment().clean(function);
+ return apply(new InternalIterableProcessAllWindowFunction<>(function), resultType, callLocation);
+ }
+
+ private <R> SingleOutputStreamOperator<R> apply(InternalWindowFunction<Iterable<T>, R, Byte, W> function, TypeInformation<R> resultType, String callLocation) {
+
String udfName = "AllWindowedStream." + callLocation;
String opName;
@@ -695,7 +1077,7 @@ public class AllWindowedStream<T, W extends Window> {
keySel,
input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
stateDesc,
- new InternalIterableAllWindowFunction<>(function),
+ function,
trigger,
evictor,
allowedLateness);
@@ -712,7 +1094,7 @@ public class AllWindowedStream<T, W extends Window> {
keySel,
input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
stateDesc,
- new InternalIterableAllWindowFunction<>(function),
+ function,
trigger,
allowedLateness);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/788b8392/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessAllWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessAllWindowFunction.java
new file mode 100644
index 0000000..5ac6766
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessAllWindowFunction.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.windowing;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.util.Collector;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Collections;
+
+@Internal
+public class FoldApplyProcessAllWindowFunction<W extends Window, T, ACC, R>
+ extends RichProcessAllWindowFunction<T, R, W>
+ implements OutputTypeConfigurable<R> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final FoldFunction<T, ACC> foldFunction;
+ private final ProcessAllWindowFunction<ACC, R, W> windowFunction;
+
+ private byte[] serializedInitialValue;
+ private TypeSerializer<ACC> accSerializer;
+ private final TypeInformation<ACC> accTypeInformation;
+ private transient ACC initialValue;
+
+ public FoldApplyProcessAllWindowFunction(ACC initialValue, FoldFunction<T, ACC> foldFunction, ProcessAllWindowFunction<ACC, R, W> windowFunction, TypeInformation<ACC> accTypeInformation) {
+ this.windowFunction = windowFunction;
+ this.foldFunction = foldFunction;
+ this.initialValue = initialValue;
+ this.accTypeInformation = accTypeInformation;
+ }
+
+ @Override
+ public void open(Configuration configuration) throws Exception {
+ FunctionUtils.openFunction(this.windowFunction, configuration);
+
+ if (serializedInitialValue == null) {
+ throw new RuntimeException("No initial value was serialized for the fold " +
+ "window function. Probably the setOutputType method was not called.");
+ }
+
+ ByteArrayInputStream bais = new ByteArrayInputStream(serializedInitialValue);
+ DataInputViewStreamWrapper in = new DataInputViewStreamWrapper(bais);
+ initialValue = accSerializer.deserialize(in);
+ }
+
+ @Override
+ public void close() throws Exception {
+ FunctionUtils.closeFunction(this.windowFunction);
+ }
+
+ @Override
+ public void setRuntimeContext(RuntimeContext t) {
+ super.setRuntimeContext(t);
+
+ FunctionUtils.setFunctionRuntimeContext(this.windowFunction, t);
+ }
+
+ @Override
+ public void process(final Context context, Iterable<T> values, Collector<R> out) throws Exception {
+ ACC result = accSerializer.copy(initialValue);
+
+ for (T val : values) {
+ result = foldFunction.fold(result, val);
+ }
+
+ windowFunction.process(windowFunction.new Context() {
+ @Override
+ public W window() {
+ return context.window();
+ }
+ }, Collections.singletonList(result), out);
+ }
+
+ @Override
+ public void setOutputType(TypeInformation<R> outTypeInfo, ExecutionConfig executionConfig) {
+ accSerializer = accTypeInformation.createSerializer(executionConfig);
+
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(baos);
+
+ try {
+ accSerializer.serialize(initialValue, out);
+ } catch (IOException ioe) {
+ throw new RuntimeException("Unable to serialize initial value of type " +
+ initialValue.getClass().getSimpleName() + " of fold window function.", ioe);
+ }
+
+ serializedInitialValue = baos.toByteArray();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/788b8392/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessAllWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessAllWindowFunction.java
new file mode 100644
index 0000000..622e020
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessAllWindowFunction.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.windowing;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.util.Collector;
+
+/**
+ * Base abstract class for functions that are evaluated over non-keyed windows using a context
+ * for retrieving extra information.
+ *
+ * @param <IN> The type of the input value.
+ * @param <OUT> The type of the output value.
+ * @param <W> The type of {@code Window} that this window function can be applied on.
+ */
+@PublicEvolving
+public abstract class ProcessAllWindowFunction<IN, OUT, W extends Window> implements Function {
+
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * Evaluates the window and outputs none or several elements.
+ *
+ * @param context The context in which the window is being evaluated.
+ * @param elements The elements in the window being evaluated.
+ * @param out A collector for emitting elements.
+ *
+ * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
+ */
+ public abstract void process(Context context, Iterable<IN> elements, Collector<OUT> out) throws Exception;
+
+ /**
+ * The context holding window metadata
+ */
+ public abstract class Context {
+ /**
+ * @return The window that is being evaluated.
+ */
+ public abstract W window();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/788b8392/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessAllWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessAllWindowFunction.java
new file mode 100644
index 0000000..142c71e
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessAllWindowFunction.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.windowing;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.util.Collector;
+
+import java.util.Collections;
+
+@Internal
+public class ReduceApplyProcessAllWindowFunction<W extends Window, T, R>
+ extends RichProcessAllWindowFunction<T, R, W> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final ReduceFunction<T> reduceFunction;
+ private final ProcessAllWindowFunction<T, R, W> windowFunction;
+
+ public ReduceApplyProcessAllWindowFunction(ReduceFunction<T> reduceFunction, ProcessAllWindowFunction<T, R, W> windowFunction) {
+ this.windowFunction = windowFunction;
+ this.reduceFunction = reduceFunction;
+ }
+
+ @Override
+ public void process(final Context context, Iterable<T> input, Collector<R> out) throws Exception {
+
+ T curr = null;
+ for (T val: input) {
+ if (curr == null) {
+ curr = val;
+ } else {
+ curr = reduceFunction.reduce(curr, val);
+ }
+ }
+ windowFunction.process(windowFunction.new Context() {
+ @Override
+ public W window() {
+ return context.window();
+ }
+ }, Collections.singletonList(curr), out);
+ }
+
+ @Override
+ public void open(Configuration configuration) throws Exception {
+ FunctionUtils.openFunction(this.windowFunction, configuration);
+ }
+
+ @Override
+ public void close() throws Exception {
+ FunctionUtils.closeFunction(this.windowFunction);
+ }
+
+ @Override
+ public void setRuntimeContext(RuntimeContext t) {
+ super.setRuntimeContext(t);
+
+ FunctionUtils.setFunctionRuntimeContext(this.windowFunction, t);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/788b8392/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichProcessAllWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichProcessAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichProcessAllWindowFunction.java
new file mode 100644
index 0000000..1130fa5
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichProcessAllWindowFunction.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.windowing;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.IterationRuntimeContext;
+import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+
+/**
+ * Base rich abstract class for functions that are evaluated over keyed (grouped) windows using a context
+ * for passing extra information.
+ *
+ * @param <IN> The type of the input value.
+ * @param <OUT> The type of the output value.
+ * @param <W> The type of {@code Window} that this window function can be applied on.
+ */
+@PublicEvolving
+public abstract class RichProcessAllWindowFunction<IN, OUT, W extends Window>
+ extends ProcessAllWindowFunction<IN, OUT, W>
+ implements RichFunction {
+
+ private static final long serialVersionUID = 1L;
+
+
+ // --------------------------------------------------------------------------------------------
+ // Runtime context access
+ // --------------------------------------------------------------------------------------------
+
+ private transient RuntimeContext runtimeContext;
+
+ @Override
+ public void setRuntimeContext(RuntimeContext t) {
+ this.runtimeContext = t;
+ }
+
+ @Override
+ public RuntimeContext getRuntimeContext() {
+ if (this.runtimeContext != null) {
+ return this.runtimeContext;
+ } else {
+ throw new IllegalStateException("The runtime context has not been initialized.");
+ }
+ }
+
+ @Override
+ public IterationRuntimeContext getIterationRuntimeContext() {
+ if (this.runtimeContext == null) {
+ throw new IllegalStateException("The runtime context has not been initialized.");
+ } else if (this.runtimeContext instanceof IterationRuntimeContext) {
+ return (IterationRuntimeContext) this.runtimeContext;
+ } else {
+ throw new IllegalStateException("This stub is not part of an iteration step function.");
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Default life cycle methods
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public void open(Configuration parameters) throws Exception {}
+
+ @Override
+ public void close() throws Exception {}
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/788b8392/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessAllWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessAllWindowFunction.java
new file mode 100644
index 0000000..9533c95
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessAllWindowFunction.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing.functions;
+
+import org.apache.flink.api.common.functions.AggregateFunction;
+import org.apache.flink.api.common.functions.IterationRuntimeContext;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.java.operators.translation.WrappingFunction;
+import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.util.Collector;
+
+import java.util.Collections;
+
+/**
+ * Internal window function for wrapping a {@link ProcessAllWindowFunction} that takes an
+ * {@code Iterable} and an {@link AggregateFunction}.
+ *
+ * @param <W> The window type
+ * @param <T> The type of the input to the AggregateFunction
+ * @param <ACC> The type of the AggregateFunction's accumulator
+ * @param <V> The type of the AggregateFunction's result, and the input to the WindowFunction
+ * @param <R> The result type of the WindowFunction
+ */
+public final class InternalAggregateProcessAllWindowFunction<T, ACC, V, R, W extends Window>
+ extends WrappingFunction<ProcessAllWindowFunction<V, R, W>>
+ implements InternalWindowFunction<Iterable<T>, R, Byte, W> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final AggregateFunction<T, ACC, V> aggFunction;
+
+ public InternalAggregateProcessAllWindowFunction(
+ AggregateFunction<T, ACC, V> aggFunction,
+ ProcessAllWindowFunction<V, R, W> windowFunction) {
+ super(windowFunction);
+ this.aggFunction = aggFunction;
+ }
+
+ @Override
+ public void apply(Byte key, final W window, Iterable<T> input, Collector<R> out) throws Exception {
+ ProcessAllWindowFunction<V, R, W> wrappedFunction = this.wrappedFunction;
+ ProcessAllWindowFunction<V, R, W>.Context context = wrappedFunction.new Context() {
+ @Override
+ public W window() {
+ return window;
+ }
+ };
+
+ final ACC acc = aggFunction.createAccumulator();
+
+ for (T val : input) {
+ aggFunction.add(val, acc);
+ }
+
+ wrappedFunction.process(context, Collections.singletonList(aggFunction.getResult(acc)), out);
+ }
+
+ @Override
+ public RuntimeContext getRuntimeContext() {
+ throw new RuntimeException("This should never be called.");
+ }
+
+ @Override
+ public IterationRuntimeContext getIterationRuntimeContext() {
+ throw new RuntimeException("This should never be called.");
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/788b8392/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableProcessAllWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableProcessAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableProcessAllWindowFunction.java
new file mode 100644
index 0000000..e33cc2a
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableProcessAllWindowFunction.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing.functions;
+
+import org.apache.flink.api.common.functions.IterationRuntimeContext;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.java.operators.translation.WrappingFunction;
+import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.util.Collector;
+
+/**
+ * Internal window function for wrapping a {@link ProcessAllWindowFunction} that takes an {@code Iterable}
+ * when the window state also is an {@code Iterable}.
+ */
+public final class InternalIterableProcessAllWindowFunction<IN, OUT, W extends Window>
+ extends WrappingFunction<ProcessAllWindowFunction<IN, OUT, W>>
+ implements InternalWindowFunction<Iterable<IN>, OUT, Byte, W> {
+
+ private static final long serialVersionUID = 1L;
+
+ public InternalIterableProcessAllWindowFunction(ProcessAllWindowFunction<IN, OUT, W> wrappedFunction) {
+ super(wrappedFunction);
+ }
+
+ @Override
+ public void apply(Byte key, final W window, Iterable<IN> input, Collector<OUT> out) throws Exception {
+ ProcessAllWindowFunction<IN, OUT, W> wrappedFunction = this.wrappedFunction;
+ ProcessAllWindowFunction<IN, OUT, W>.Context context = wrappedFunction.new Context() {
+ @Override
+ public W window() {
+ return window;
+ }
+ };
+
+ wrappedFunction.process(context, input, out);
+ }
+
+ @Override
+ public RuntimeContext getRuntimeContext() {
+ throw new RuntimeException("This should never be called.");
+ }
+
+ @Override
+ public IterationRuntimeContext getIterationRuntimeContext() {
+ throw new RuntimeException("This should never be called.");
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/788b8392/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueProcessAllWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueProcessAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueProcessAllWindowFunction.java
new file mode 100644
index 0000000..0284ef7
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueProcessAllWindowFunction.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing.functions;
+
+import org.apache.flink.api.common.functions.IterationRuntimeContext;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.java.operators.translation.WrappingFunction;
+import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.util.Collector;
+
+import java.util.Collections;
+
+/**
+ * Internal window function for wrapping a {@link ProcessAllWindowFunction} that takes an {@code Iterable}
+ * when the window state is a single value.
+ */
+public final class InternalSingleValueProcessAllWindowFunction<IN, OUT, W extends Window>
+ extends WrappingFunction<ProcessAllWindowFunction<IN, OUT, W>>
+ implements InternalWindowFunction<IN, OUT, Byte, W> {
+
+ private static final long serialVersionUID = 1L;
+
+ public InternalSingleValueProcessAllWindowFunction(ProcessAllWindowFunction<IN, OUT, W> wrappedFunction) {
+ super(wrappedFunction);
+ }
+
+ @Override
+ public void apply(Byte key, final W window, IN input, Collector<OUT> out) throws Exception {
+ ProcessAllWindowFunction<IN, OUT, W> wrappedFunction = this.wrappedFunction;
+ ProcessAllWindowFunction<IN, OUT, W>.Context context = wrappedFunction.new Context() {
+ @Override
+ public W window() {
+ return window;
+ }
+ };
+
+ wrappedFunction.process(context, Collections.singletonList(input), out);
+ }
+
+ @Override
+ public RuntimeContext getRuntimeContext() {
+ throw new RuntimeException("This should never be called.");
+ }
+
+ @Override
+ public IterationRuntimeContext getIterationRuntimeContext() {
+ throw new RuntimeException("This should never be called.");
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/788b8392/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueProcessWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueProcessWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueProcessWindowFunction.java
index b28c208..7a4e8c6 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueProcessWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueProcessWindowFunction.java
@@ -21,14 +21,13 @@ import org.apache.flink.api.common.functions.IterationRuntimeContext;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.java.operators.translation.WrappingFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
-import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.util.Collector;
import java.util.Collections;
/**
- * Internal window function for wrapping a {@link WindowFunction} that takes an {@code Iterable}
+ * Internal window function for wrapping a {@link ProcessWindowFunction} that takes an {@code Iterable}
* when the window state is a single value.
*/
public final class InternalSingleValueProcessWindowFunction<IN, OUT, KEY, W extends Window>
http://git-wip-us.apache.org/repos/asf/flink/blob/788b8392/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyProcessWindowFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyProcessWindowFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyProcessWindowFunctionTest.java
index af5c77a..734879d 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyProcessWindowFunctionTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyProcessWindowFunctionTest.java
@@ -22,14 +22,17 @@ import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.util.ListCollector;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.ByteSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.java.Utils;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.functions.windowing.FoldApplyProcessAllWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.FoldApplyProcessWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.FoldApplyWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.graph.StreamGraph;
@@ -39,6 +42,7 @@ import org.apache.flink.streaming.api.transformations.SourceTransformation;
import org.apache.flink.streaming.api.transformations.StreamTransformation;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.runtime.operators.windowing.AccumulatingProcessingTimeWindowOperator;
+import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessAllWindowFunction;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction;
import org.apache.flink.util.Collector;
import org.junit.Test;
@@ -145,6 +149,101 @@ public class FoldApplyProcessWindowFunctionTest {
Assert.assertEquals(expected, result);
}
+ /**
+ * Tests that the FoldWindowFunction gets the output type serializer set by the
+ * StreamGraphGenerator and checks that the FoldWindowFunction computes the correct result.
+ */
+ @Test
+ public void testFoldAllWindowFunctionOutputTypeConfigurable() throws Exception{
+ StreamExecutionEnvironment env = new DummyStreamExecutionEnvironment();
+
+ List<StreamTransformation<?>> transformations = new ArrayList<>();
+
+ int initValue = 1;
+
+ FoldApplyProcessAllWindowFunction<TimeWindow, Integer, Integer, Integer> foldWindowFunction = new FoldApplyProcessAllWindowFunction<>(
+ initValue,
+ new FoldFunction<Integer, Integer>() {
+ @Override
+ public Integer fold(Integer accumulator, Integer value) throws Exception {
+ return accumulator + value;
+ }
+
+ },
+ new ProcessAllWindowFunction<Integer, Integer, TimeWindow>() {
+ @Override
+ public void process(Context context,
+ Iterable<Integer> input,
+ Collector<Integer> out) throws Exception {
+ for (Integer in: input) {
+ out.collect(in);
+ }
+ }
+ },
+ BasicTypeInfo.INT_TYPE_INFO
+ );
+
+ AccumulatingProcessingTimeWindowOperator<Byte, Integer, Integer> windowOperator = new AccumulatingProcessingTimeWindowOperator<>(
+ new InternalIterableProcessAllWindowFunction<>(foldWindowFunction),
+ new KeySelector<Integer, Byte>() {
+ private static final long serialVersionUID = -7951310554369722809L;
+
+ @Override
+ public Byte getKey(Integer value) throws Exception {
+ return 0;
+ }
+ },
+ ByteSerializer.INSTANCE,
+ IntSerializer.INSTANCE,
+ 3000,
+ 3000
+ );
+
+ SourceFunction<Integer> sourceFunction = new SourceFunction<Integer>(){
+
+ private static final long serialVersionUID = 8297735565464653028L;
+
+ @Override
+ public void run(SourceContext<Integer> ctx) throws Exception {
+
+ }
+
+ @Override
+ public void cancel() {
+
+ }
+ };
+
+ SourceTransformation<Integer> source = new SourceTransformation<>("", new StreamSource<>(sourceFunction), BasicTypeInfo.INT_TYPE_INFO, 1);
+
+ transformations.add(new OneInputTransformation<>(source, "test", windowOperator, BasicTypeInfo.INT_TYPE_INFO, 1));
+
+ StreamGraph streamGraph = StreamGraphGenerator.generate(env, transformations);
+
+ List<Integer> result = new ArrayList<>();
+ List<Integer> input = new ArrayList<>();
+ List<Integer> expected = new ArrayList<>();
+
+ input.add(1);
+ input.add(2);
+ input.add(3);
+
+ for (int value : input) {
+ initValue += value;
+ }
+
+ expected.add(initValue);
+
+ foldWindowFunction.process(foldWindowFunction.new Context() {
+ @Override
+ public TimeWindow window() {
+ return new TimeWindow(0, 1);
+ }
+ }, input, new ListCollector<>(result));
+
+ Assert.assertEquals(expected, result);
+ }
+
public static class DummyStreamExecutionEnvironment extends StreamExecutionEnvironment {
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/788b8392/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateDescriptorPassingTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateDescriptorPassingTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateDescriptorPassingTest.java
index 813ca96..f306231 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateDescriptorPassingTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateDescriptorPassingTest.java
@@ -34,6 +34,7 @@ import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
@@ -162,6 +163,24 @@ public class StateDescriptorPassingTest {
}
@Test
+ public void testProcessAllWindowState() throws Exception {
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+ env.registerTypeWithKryoSerializer(File.class, JavaSerializer.class);
+
+ DataStream<File> src = env.fromElements(new File("/"));
+
+ SingleOutputStreamOperator<?> result = src
+ .timeWindowAll(Time.milliseconds(1000))
+ .process(new ProcessAllWindowFunction<File, String, TimeWindow>() {
+ @Override
+ public void process(Context ctx, Iterable<File> input, Collector<String> out) {}
+ });
+
+ validateListStateDescriptorConfigured(result);
+ }
+
+ @Test
public void testFoldWindowAllState() throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
http://git-wip-us.apache.org/repos/asf/flink/blob/788b8392/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java
index e49a496..8f795e9 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java
@@ -26,15 +26,19 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.util.StreamingFunctionUtils;
import org.apache.flink.streaming.api.functions.windowing.RichAllWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.RichProcessAllWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.RichProcessWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalAggregateProcessAllWindowFunction;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalAggregateProcessWindowFunction;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableAllWindowFunction;
+import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessAllWindowFunction;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueAllWindowFunction;
+import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueProcessAllWindowFunction;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueProcessWindowFunction;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction;
import org.apache.flink.util.Collector;
@@ -99,6 +103,47 @@ public class InternalWindowFunctionTest {
@SuppressWarnings("unchecked")
@Test
+ public void testInternalIterableProcessAllWindowFunction() throws Exception {
+
+ ProcessAllWindowFunctionMock mock = mock(ProcessAllWindowFunctionMock.class);
+ InternalIterableProcessAllWindowFunction<Long, String, TimeWindow> windowFunction =
+ new InternalIterableProcessAllWindowFunction<>(mock);
+
+ // check setOutputType
+ TypeInformation<String> stringType = BasicTypeInfo.STRING_TYPE_INFO;
+ ExecutionConfig execConf = new ExecutionConfig();
+ execConf.setParallelism(42);
+
+ StreamingFunctionUtils.setOutputType(windowFunction, stringType, execConf);
+ verify(mock).setOutputType(stringType, execConf);
+
+ // check open
+ Configuration config = new Configuration();
+
+ windowFunction.open(config);
+ verify(mock).open(config);
+
+ // check setRuntimeContext
+ RuntimeContext rCtx = mock(RuntimeContext.class);
+
+ windowFunction.setRuntimeContext(rCtx);
+ verify(mock).setRuntimeContext(rCtx);
+
+ // check apply
+ TimeWindow w = mock(TimeWindow.class);
+ Iterable<Long> i = (Iterable<Long>)mock(Iterable.class);
+ Collector<String> c = (Collector<String>) mock(Collector.class);
+
+ windowFunction.apply(((byte)0), w, i, c);
+ verify(mock).process((ProcessAllWindowFunctionMock.Context) anyObject(), eq(i), eq(c));
+
+ // check close
+ windowFunction.close();
+ verify(mock).close();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
public void testInternalIterableWindowFunction() throws Exception {
WindowFunctionMock mock = mock(WindowFunctionMock.class);
@@ -263,6 +308,47 @@ public class InternalWindowFunctionTest {
@SuppressWarnings("unchecked")
@Test
+ public void testInternalSingleValueProcessAllWindowFunction() throws Exception {
+
+ ProcessAllWindowFunctionMock mock = mock(ProcessAllWindowFunctionMock.class);
+ InternalSingleValueProcessAllWindowFunction<Long, String, TimeWindow> windowFunction =
+ new InternalSingleValueProcessAllWindowFunction<>(mock);
+
+ // check setOutputType
+ TypeInformation<String> stringType = BasicTypeInfo.STRING_TYPE_INFO;
+ ExecutionConfig execConf = new ExecutionConfig();
+ execConf.setParallelism(42);
+
+ StreamingFunctionUtils.setOutputType(windowFunction, stringType, execConf);
+
+ verify(mock).setOutputType(stringType, execConf);
+
+ // check open
+ Configuration config = new Configuration();
+
+ windowFunction.open(config);
+ verify(mock).open(config);
+
+ // check setRuntimeContext
+ RuntimeContext rCtx = mock(RuntimeContext.class);
+
+ windowFunction.setRuntimeContext(rCtx);
+ verify(mock).setRuntimeContext(rCtx);
+
+ // check apply
+ TimeWindow w = mock(TimeWindow.class);
+ Collector<String> c = (Collector<String>) mock(Collector.class);
+
+ windowFunction.apply(((byte)0), w, 23L, c);
+ verify(mock).process((ProcessAllWindowFunctionMock.Context) anyObject(), (Iterable<Long>)argThat(IsIterableContainingInOrder.contains(23L)), eq(c));
+
+ // check close
+ windowFunction.close();
+ verify(mock).close();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
public void testInternalSingleValueProcessWindowFunction() throws Exception {
ProcessWindowFunctionMock mock = mock(ProcessWindowFunctionMock.class);
@@ -310,7 +396,7 @@ public class InternalWindowFunctionTest {
InternalAggregateProcessWindowFunction<Long, Set<Long>, Map<Long, Long>, String, Long, TimeWindow> windowFunction =
new InternalAggregateProcessWindowFunction<>(new AggregateFunction<Long, Set<Long>, Map<Long, Long>>() {
private static final long serialVersionUID = 1L;
-
+
@Override
public Set<Long> createAccumulator() {
return new HashSet<>();
@@ -364,7 +450,7 @@ public class InternalWindowFunctionTest {
List<Long> args = new LinkedList<>();
args.add(23L);
args.add(24L);
-
+
windowFunction.apply(42L, w, args, c);
verify(mock).process(
eq(42L),
@@ -379,6 +465,83 @@ public class InternalWindowFunctionTest {
verify(mock).close();
}
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testInternalAggregateProcessAllWindowFunction() throws Exception {
+
+ AggregateProcessAllWindowFunctionMock mock = mock(AggregateProcessAllWindowFunctionMock.class);
+
+ InternalAggregateProcessAllWindowFunction<Long, Set<Long>, Map<Long, Long>, String, TimeWindow> windowFunction =
+ new InternalAggregateProcessAllWindowFunction<>(new AggregateFunction<Long, Set<Long>, Map<Long, Long>>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Set<Long> createAccumulator() {
+ return new HashSet<>();
+ }
+
+ @Override
+ public void add(Long value, Set<Long> accumulator) {
+ accumulator.add(value);
+ }
+
+ @Override
+ public Map<Long, Long> getResult(Set<Long> accumulator) {
+ Map<Long, Long> result = new HashMap<>();
+ for (Long in : accumulator) {
+ result.put(in, in);
+ }
+ return result;
+ }
+
+ @Override
+ public Set<Long> merge(Set<Long> a, Set<Long> b) {
+ a.addAll(b);
+ return a;
+ }
+ }, mock);
+
+ // check setOutputType
+ TypeInformation<String> stringType = BasicTypeInfo.STRING_TYPE_INFO;
+ ExecutionConfig execConf = new ExecutionConfig();
+ execConf.setParallelism(42);
+
+ StreamingFunctionUtils.setOutputType(windowFunction, stringType, execConf);
+ verify(mock).setOutputType(stringType, execConf);
+
+ // check open
+ Configuration config = new Configuration();
+
+ windowFunction.open(config);
+ verify(mock).open(config);
+
+ // check setRuntimeContext
+ RuntimeContext rCtx = mock(RuntimeContext.class);
+
+ windowFunction.setRuntimeContext(rCtx);
+ verify(mock).setRuntimeContext(rCtx);
+
+ // check apply
+ TimeWindow w = mock(TimeWindow.class);
+ Collector<String> c = (Collector<String>) mock(Collector.class);
+
+ List<Long> args = new LinkedList<>();
+ args.add(23L);
+ args.add(24L);
+
+ windowFunction.apply(((byte)0), w, args, c);
+ verify(mock).process(
+ (AggregateProcessAllWindowFunctionMock.Context) anyObject(),
+ (Iterable) argThat(containsInAnyOrder(allOf(
+ hasEntry(is(23L), is(23L)),
+ hasEntry(is(24L), is(24L))))),
+ eq(c));
+
+ // check close
+ windowFunction.close();
+ verify(mock).close();
+ }
+
public static class ProcessWindowFunctionMock
extends RichProcessWindowFunction<Long, String, Long, TimeWindow>
implements OutputTypeConfigurable<String> {
@@ -405,6 +568,19 @@ public class InternalWindowFunctionTest {
public void process(Long aLong, Context context, Iterable<Map<Long, Long>> input, Collector<String> out) throws Exception { }
}
+ public static class AggregateProcessAllWindowFunctionMock
+ extends RichProcessAllWindowFunction<Map<Long, Long>, String, TimeWindow>
+ implements OutputTypeConfigurable<String> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void setOutputType(TypeInformation<String> outTypeInfo, ExecutionConfig executionConfig) { }
+
+ @Override
+ public void process(Context context, Iterable<Map<Long, Long>> input, Collector<String> out) throws Exception { }
+ }
+
public static class WindowFunctionMock
extends RichWindowFunction<Long, String, Long, TimeWindow>
implements OutputTypeConfigurable<String> {
@@ -430,4 +606,17 @@ public class InternalWindowFunctionTest {
@Override
public void apply(TimeWindow window, Iterable<Long> values, Collector<String> out) throws Exception { }
}
+
+ public static class ProcessAllWindowFunctionMock
+ extends RichProcessAllWindowFunction<Long, String, TimeWindow>
+ implements OutputTypeConfigurable<String> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void setOutputType(TypeInformation<String> outTypeInfo, ExecutionConfig executionConfig) { }
+
+ @Override
+ public void process(Context context, Iterable<Long> input, Collector<String> out) throws Exception { }
+ }
}