You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/03/25 16:01:36 UTC
[1/3] flink git commit: [FLINK-5929] Allow Access to Per-Window State
in ProcessWindowFunction
Repository: flink
Updated Branches:
refs/heads/master 5c37e55c8 -> 662ed33d8
http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java
index d4fefa2..6f34607 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java
@@ -21,9 +21,11 @@ package org.apache.flink.streaming.api.operators.windowing.functions;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.KeyedStateStore;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.util.functions.StreamingFunctionUtils;
import org.apache.flink.streaming.api.functions.windowing.RichAllWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.RichProcessAllWindowFunction;
@@ -31,6 +33,7 @@ import org.apache.flink.streaming.api.functions.windowing.RichProcessWindowFunct
import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalAggregateProcessAllWindowFunction;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalAggregateProcessWindowFunction;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableAllWindowFunction;
@@ -41,6 +44,7 @@ import org.apache.flink.streaming.runtime.operators.windowing.functions.Internal
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueProcessAllWindowFunction;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueProcessWindowFunction;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction;
+import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction;
import org.apache.flink.util.Collector;
import org.hamcrest.collection.IsIterableContainingInOrder;
import org.junit.Test;
@@ -56,7 +60,11 @@ import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.collection.IsMapContaining.hasEntry;
import static org.hamcrest.core.AllOf.allOf;
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.anyObject;
+import static org.mockito.Mockito.argThat;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
public class InternalWindowFunctionTest {
@@ -93,7 +101,9 @@ public class InternalWindowFunctionTest {
Iterable<Long> i = (Iterable<Long>)mock(Iterable.class);
Collector<String> c = (Collector<String>) mock(Collector.class);
- windowFunction.apply(((byte)0), w, i, c);
+ InternalWindowFunction.InternalWindowContext ctx = mock(InternalWindowFunction.InternalWindowContext.class);
+
+ windowFunction.process(((byte)0), w, ctx, i, c);
verify(mock).apply(w, i, c);
// check close
@@ -134,7 +144,8 @@ public class InternalWindowFunctionTest {
Iterable<Long> i = (Iterable<Long>)mock(Iterable.class);
Collector<String> c = (Collector<String>) mock(Collector.class);
- windowFunction.apply(((byte)0), w, i, c);
+ InternalWindowFunction.InternalWindowContext ctx = mock(InternalWindowFunction.InternalWindowContext.class);
+ windowFunction.process(((byte)0), w, ctx, i, c);
verify(mock).process((ProcessAllWindowFunctionMock.Context) anyObject(), eq(i), eq(c));
// check close
@@ -175,7 +186,8 @@ public class InternalWindowFunctionTest {
Iterable<Long> i = (Iterable<Long>)mock(Iterable.class);
Collector<String> c = (Collector<String>) mock(Collector.class);
- windowFunction.apply(42L, w, i, c);
+ InternalWindowFunction.InternalWindowContext ctx = mock(InternalWindowFunction.InternalWindowContext.class);
+ windowFunction.process(42L, w, ctx, i, c);
verify(mock).apply(eq(42L), eq(w), eq(i), eq(c));
// check close
@@ -215,8 +227,9 @@ public class InternalWindowFunctionTest {
TimeWindow w = mock(TimeWindow.class);
Iterable<Long> i = (Iterable<Long>)mock(Iterable.class);
Collector<String> c = (Collector<String>) mock(Collector.class);
+ InternalWindowFunction.InternalWindowContext ctx = mock(InternalWindowFunction.InternalWindowContext.class);
- windowFunction.apply(42L, w, i, c);
+ windowFunction.process(42L, w, ctx, i, c);
verify(mock).process(eq(42L), (ProcessWindowFunctionMock.Context) anyObject(), eq(i), eq(c));
// check close
@@ -256,8 +269,9 @@ public class InternalWindowFunctionTest {
// check apply
TimeWindow w = mock(TimeWindow.class);
Collector<String> c = (Collector<String>) mock(Collector.class);
+ InternalWindowFunction.InternalWindowContext ctx = mock(InternalWindowFunction.InternalWindowContext.class);
- windowFunction.apply(42L, w, 23L, c);
+ windowFunction.process(42L, w, ctx, 23L, c);
verify(mock).apply(eq(42L), eq(w), (Iterable<Long>)argThat(IsIterableContainingInOrder.contains(23L)), eq(c));
// check close
@@ -297,8 +311,9 @@ public class InternalWindowFunctionTest {
// check apply
TimeWindow w = mock(TimeWindow.class);
Collector<String> c = (Collector<String>) mock(Collector.class);
+ InternalWindowFunction.InternalWindowContext ctx = mock(InternalWindowFunction.InternalWindowContext.class);
- windowFunction.apply(((byte)0), w, 23L, c);
+ windowFunction.process(((byte)0), w, ctx, 23L, c);
verify(mock).apply(eq(w), (Iterable<Long>)argThat(IsIterableContainingInOrder.contains(23L)), eq(c));
// check close
@@ -338,8 +353,9 @@ public class InternalWindowFunctionTest {
// check apply
TimeWindow w = mock(TimeWindow.class);
Collector<String> c = (Collector<String>) mock(Collector.class);
+ InternalWindowFunction.InternalWindowContext ctx = mock(InternalWindowFunction.InternalWindowContext.class);
- windowFunction.apply(((byte)0), w, 23L, c);
+ windowFunction.process(((byte)0), w, ctx, 23L, c);
verify(mock).process((ProcessAllWindowFunctionMock.Context) anyObject(), (Iterable<Long>)argThat(IsIterableContainingInOrder.contains(23L)), eq(c));
// check close
@@ -378,8 +394,9 @@ public class InternalWindowFunctionTest {
// check apply
TimeWindow w = mock(TimeWindow.class);
Collector<String> c = (Collector<String>) mock(Collector.class);
+ InternalWindowFunction.InternalWindowContext ctx = mock(InternalWindowFunction.InternalWindowContext.class);
- windowFunction.apply(42L, w, 23L, c);
+ windowFunction.process(42L, w, ctx,23L, c);
verify(mock).process(eq(42L), (ProcessWindowFunctionMock.Context) anyObject(), (Iterable<Long>)argThat(IsIterableContainingInOrder.contains(23L)), eq(c));
// check close
@@ -450,8 +467,9 @@ public class InternalWindowFunctionTest {
List<Long> args = new LinkedList<>();
args.add(23L);
args.add(24L);
+ InternalWindowFunction.InternalWindowContext ctx = mock(InternalWindowFunction.InternalWindowContext.class);
- windowFunction.apply(42L, w, args, c);
+ windowFunction.process(42L, w, ctx, args, c);
verify(mock).process(
eq(42L),
(AggregateProcessWindowFunctionMock.Context) anyObject(),
@@ -528,8 +546,9 @@ public class InternalWindowFunctionTest {
List<Long> args = new LinkedList<>();
args.add(23L);
args.add(24L);
+ InternalWindowFunction.InternalWindowContext ctx = mock(InternalWindowFunction.InternalWindowContext.class);
- windowFunction.apply(((byte)0), w, args, c);
+ windowFunction.process(((byte)0), w, ctx, args, c);
verify(mock).process(
(AggregateProcessAllWindowFunctionMock.Context) anyObject(),
(Iterable) argThat(containsInAnyOrder(allOf(
@@ -552,7 +571,9 @@ public class InternalWindowFunctionTest {
public void setOutputType(TypeInformation<String> outTypeInfo, ExecutionConfig executionConfig) { }
@Override
- public void process(Long aLong, Context context, Iterable<Long> input, Collector<String> out) throws Exception { }
+ public void process(Long aLong, ProcessWindowFunction<Long, String, Long, TimeWindow>.Context context, Iterable<Long> elements, Collector<String> out) throws Exception {
+
+ }
}
public static class AggregateProcessWindowFunctionMock
@@ -565,7 +586,9 @@ public class InternalWindowFunctionTest {
public void setOutputType(TypeInformation<String> outTypeInfo, ExecutionConfig executionConfig) { }
@Override
- public void process(Long aLong, Context context, Iterable<Map<Long, Long>> input, Collector<String> out) throws Exception { }
+ public void process(Long aLong, ProcessWindowFunction<Map<Long, Long>, String, Long, TimeWindow>.Context context, Iterable<Map<Long, Long>> elements, Collector<String> out) throws Exception {
+
+ }
}
public static class AggregateProcessAllWindowFunctionMock
http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/RegularWindowOperatorContractTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/RegularWindowOperatorContractTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/RegularWindowOperatorContractTest.java
index 11508c5..ff1cbdf 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/RegularWindowOperatorContractTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/RegularWindowOperatorContractTest.java
@@ -118,9 +118,9 @@ public class RegularWindowOperatorContractTest extends WindowOperatorContractTes
testHarness.processElement(new StreamRecord<>(1, 0L));
- verify(mockWindowFunction, times(2)).apply(eq(1), anyTimeWindow(), anyInt(), WindowOperatorContractTest.<Void>anyCollector());
- verify(mockWindowFunction, times(1)).apply(eq(1), eq(new TimeWindow(0, 2)), eq(3), WindowOperatorContractTest.<Void>anyCollector());
- verify(mockWindowFunction, times(1)).apply(eq(1), eq(new TimeWindow(2, 4)), eq(3), WindowOperatorContractTest.<Void>anyCollector());
+ verify(mockWindowFunction, times(2)).process(eq(1), anyTimeWindow(), anyInternalWindowContext(), anyInt(), WindowOperatorContractTest.<Void>anyCollector());
+ verify(mockWindowFunction, times(1)).process(eq(1), eq(new TimeWindow(0, 2)), anyInternalWindowContext(), eq(3), WindowOperatorContractTest.<Void>anyCollector());
+ verify(mockWindowFunction, times(1)).process(eq(1), eq(new TimeWindow(2, 4)), anyInternalWindowContext(), eq(3), WindowOperatorContractTest.<Void>anyCollector());
// clear is only called at cleanup time/GC time
verify(mockTrigger, never()).clear(anyTimeWindow(), anyTriggerContext());
@@ -182,9 +182,9 @@ public class RegularWindowOperatorContractTest extends WindowOperatorContractTes
testHarness.processElement(new StreamRecord<>(1, 0L));
- verify(mockWindowFunction, times(2)).apply(eq(1), anyTimeWindow(), anyInt(), WindowOperatorContractTest.<Void>anyCollector());
- verify(mockWindowFunction, times(1)).apply(eq(1), eq(new TimeWindow(0, 2)), eq(3), WindowOperatorContractTest.<Void>anyCollector());
- verify(mockWindowFunction, times(1)).apply(eq(1), eq(new TimeWindow(2, 4)), eq(3), WindowOperatorContractTest.<Void>anyCollector());
+ verify(mockWindowFunction, times(2)).process(eq(1), anyTimeWindow(), anyInternalWindowContext(), anyInt(), WindowOperatorContractTest.<Void>anyCollector());
+ verify(mockWindowFunction, times(1)).process(eq(1), eq(new TimeWindow(0, 2)), anyInternalWindowContext(), eq(3), WindowOperatorContractTest.<Void>anyCollector());
+ verify(mockWindowFunction, times(1)).process(eq(1), eq(new TimeWindow(2, 4)), anyInternalWindowContext(), eq(3), WindowOperatorContractTest.<Void>anyCollector());
// clear is only called at cleanup time/GC time
verify(mockTrigger, never()).clear(anyTimeWindow(), anyTriggerContext());
http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
index 8aae46a..faab505 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
@@ -163,6 +163,10 @@ public abstract class WindowOperatorContractTest extends TestLogger {
return Mockito.any();
}
+ static InternalWindowFunction.InternalWindowContext anyInternalWindowContext() {
+ return Mockito.any();
+ }
+
static Trigger.OnMergeContext anyOnMergeContext() {
return Mockito.any();
}
@@ -408,9 +412,9 @@ public abstract class WindowOperatorContractTest extends TestLogger {
testHarness.processElement(new StreamRecord<>(0, 0L));
- verify(mockWindowFunction, times(2)).apply(eq(0), anyTimeWindow(), anyIntIterable(), WindowOperatorContractTest.<Void>anyCollector());
- verify(mockWindowFunction, times(1)).apply(eq(0), eq(new TimeWindow(0, 2)), intIterable(0), WindowOperatorContractTest.<Void>anyCollector());
- verify(mockWindowFunction, times(1)).apply(eq(0), eq(new TimeWindow(2, 4)), intIterable(0), WindowOperatorContractTest.<Void>anyCollector());
+ verify(mockWindowFunction, times(2)).process(eq(0), anyTimeWindow(), anyInternalWindowContext(), anyIntIterable(), WindowOperatorContractTest.<Void>anyCollector());
+ verify(mockWindowFunction, times(1)).process(eq(0), eq((new TimeWindow(0, 2))), anyInternalWindowContext(), intIterable(0), WindowOperatorContractTest.<Void>anyCollector());
+ verify(mockWindowFunction, times(1)).process(eq(0), eq(new TimeWindow(2, 4)), anyInternalWindowContext(), intIterable(0), WindowOperatorContractTest.<Void>anyCollector());
}
@Test
@@ -455,9 +459,9 @@ public abstract class WindowOperatorContractTest extends TestLogger {
testHarness.processElement(new StreamRecord<>(0, 0L));
- verify(mockWindowFunction, times(2)).apply(anyInt(), anyTimeWindow(), anyIntIterable(), WindowOperatorContractTest.<Void>anyCollector());
- verify(mockWindowFunction, times(1)).apply(eq(0), eq(new TimeWindow(0, 2)), intIterable(0, 0), WindowOperatorContractTest.<Void>anyCollector());
- verify(mockWindowFunction, times(1)).apply(eq(1), eq(new TimeWindow(0, 1)), intIterable(1, 1), WindowOperatorContractTest.<Void>anyCollector());
+ verify(mockWindowFunction, times(2)).process(anyInt(), anyTimeWindow(), anyInternalWindowContext(), anyIntIterable(), WindowOperatorContractTest.<Void>anyCollector());
+ verify(mockWindowFunction, times(1)).process(eq(0), eq(new TimeWindow(0, 2)), anyInternalWindowContext(), intIterable(0, 0), WindowOperatorContractTest.<Void>anyCollector());
+ verify(mockWindowFunction, times(1)).process(eq(1), eq(new TimeWindow(0, 1)), anyInternalWindowContext(), intIterable(1, 1), WindowOperatorContractTest.<Void>anyCollector());
}
@Test
@@ -509,16 +513,16 @@ public abstract class WindowOperatorContractTest extends TestLogger {
@Override
public Void answer(InvocationOnMock invocation) throws Exception {
@SuppressWarnings("unchecked")
- Collector<String> out = invocation.getArgumentAt(3, Collector.class);
+ Collector<String> out = invocation.getArgumentAt(4, Collector.class);
out.collect("Hallo");
out.collect("Ciao");
return null;
}
- }).when(mockWindowFunction).apply(eq(0), eq(new TimeWindow(0, 2)), intIterable(0), WindowOperatorContractTest.<String>anyCollector());
+ }).when(mockWindowFunction).process(eq(0), eq(new TimeWindow(0, 2)), anyInternalWindowContext(), intIterable(0), WindowOperatorContractTest.<String>anyCollector());
testHarness.processElement(new StreamRecord<>(0, 0L));
- verify(mockWindowFunction, times(1)).apply(eq(0), eq(new TimeWindow(0, 2)), intIterable(0), WindowOperatorContractTest.<String>anyCollector());
+ verify(mockWindowFunction, times(1)).process(eq(0), eq(new TimeWindow(0, 2)), anyInternalWindowContext(), intIterable(0), WindowOperatorContractTest.<String>anyCollector());
assertThat(testHarness.extractOutputStreamRecords(),
contains(isStreamRecord("Hallo", 1L), isStreamRecord("Ciao", 1L)));
@@ -553,25 +557,25 @@ public abstract class WindowOperatorContractTest extends TestLogger {
@Override
public Void answer(InvocationOnMock invocation) throws Exception {
@SuppressWarnings("unchecked")
- Collector<String> out = invocation.getArgumentAt(3, Collector.class);
+ Collector<String> out = invocation.getArgumentAt(4, Collector.class);
out.collect("Hallo");
out.collect("Ciao");
return null;
}
- }).when(mockWindowFunction).apply(eq(0), eq(new TimeWindow(0, 2)), intIterable(0), WindowOperatorContractTest.<String>anyCollector());
+ }).when(mockWindowFunction).process(eq(0), eq(new TimeWindow(0, 2)), anyInternalWindowContext(), intIterable(0), WindowOperatorContractTest.<String>anyCollector());
timeAdaptor.shouldRegisterTimerOnElement(mockTrigger, 1);
testHarness.processElement(new StreamRecord<>(0, 0L));
- verify(mockWindowFunction, never()).apply(anyInt(), anyTimeWindow(), anyIntIterable(), WindowOperatorContractTest.<String>anyCollector());
+ verify(mockWindowFunction, never()).process(anyInt(), anyTimeWindow(), anyInternalWindowContext(), anyIntIterable(), WindowOperatorContractTest.<String>anyCollector());
assertTrue(testHarness.extractOutputStreamRecords().isEmpty());
timeAdaptor.shouldFireOnTime(mockTrigger);
timeAdaptor.advanceTime(testHarness, 1L);
- verify(mockWindowFunction, times(1)).apply(eq(0), eq(new TimeWindow(0, 2)), intIterable(0), WindowOperatorContractTest.<String>anyCollector());
+ verify(mockWindowFunction, times(1)).process(eq(0), eq(new TimeWindow(0, 2)), anyInternalWindowContext(), intIterable(0), WindowOperatorContractTest.<String>anyCollector());
assertThat(testHarness.extractOutputStreamRecords(),
contains(isStreamRecord("Hallo", 1L), isStreamRecord("Ciao", 1L)));
@@ -650,9 +654,9 @@ public abstract class WindowOperatorContractTest extends TestLogger {
testHarness.processElement(new StreamRecord<>(0, 0L));
- verify(mockWindowFunction, times(2)).apply(eq(0), anyTimeWindow(), anyIntIterable(), WindowOperatorContractTest.<Void>anyCollector());
- verify(mockWindowFunction, times(1)).apply(eq(0), eq(new TimeWindow(0, 2)), intIterable(0), WindowOperatorContractTest.<Void>anyCollector());
- verify(mockWindowFunction, times(1)).apply(eq(0), eq(new TimeWindow(2, 4)), intIterable(0), WindowOperatorContractTest.<Void>anyCollector());
+ verify(mockWindowFunction, times(2)).process(eq(0), anyTimeWindow(), anyInternalWindowContext(), anyIntIterable(), WindowOperatorContractTest.<Void>anyCollector());
+ verify(mockWindowFunction, times(1)).process(eq(0), eq(new TimeWindow(0, 2)), anyInternalWindowContext(), intIterable(0), WindowOperatorContractTest.<Void>anyCollector());
+ verify(mockWindowFunction, times(1)).process(eq(0), eq(new TimeWindow(2, 4)), anyInternalWindowContext(), intIterable(0), WindowOperatorContractTest.<Void>anyCollector());
// clear is only called at cleanup time/GC time
verify(mockTrigger, never()).clear(anyTimeWindow(), anyTriggerContext());
@@ -693,9 +697,9 @@ public abstract class WindowOperatorContractTest extends TestLogger {
testHarness.processElement(new StreamRecord<>(0, 0L));
- verify(mockWindowFunction, times(2)).apply(eq(0), anyTimeWindow(), anyIntIterable(), WindowOperatorContractTest.<Void>anyCollector());
- verify(mockWindowFunction, times(1)).apply(eq(0), eq(new TimeWindow(0, 2)), intIterable(0), WindowOperatorContractTest.<Void>anyCollector());
- verify(mockWindowFunction, times(1)).apply(eq(0), eq(new TimeWindow(2, 4)), intIterable(0), WindowOperatorContractTest.<Void>anyCollector());
+ verify(mockWindowFunction, times(2)).process(eq(0), anyTimeWindow(), anyInternalWindowContext(), anyIntIterable(), WindowOperatorContractTest.<Void>anyCollector());
+ verify(mockWindowFunction, times(1)).process(eq(0), eq(new TimeWindow(0, 2)), anyInternalWindowContext(), intIterable(0), WindowOperatorContractTest.<Void>anyCollector());
+ verify(mockWindowFunction, times(1)).process(eq(0), eq(new TimeWindow(2, 4)), anyInternalWindowContext(), intIterable(0), WindowOperatorContractTest.<Void>anyCollector());
// clear is only called at cleanup time/GC time
verify(mockTrigger, never()).clear(anyTimeWindow(), anyTriggerContext());
@@ -858,9 +862,9 @@ public abstract class WindowOperatorContractTest extends TestLogger {
timeAdaptor.advanceTime(testHarness, 0L);
- verify(mockWindowFunction, times(2)).apply(eq(0), anyTimeWindow(), anyIntIterable(), WindowOperatorContractTest.<Void>anyCollector());
- verify(mockWindowFunction, times(1)).apply(eq(0), eq(new TimeWindow(0, 2)), intIterable(0), WindowOperatorContractTest.<Void>anyCollector());
- verify(mockWindowFunction, times(1)).apply(eq(0), eq(new TimeWindow(2, 4)), intIterable(0), WindowOperatorContractTest.<Void>anyCollector());
+ verify(mockWindowFunction, times(2)).process(eq(0), anyTimeWindow(), anyInternalWindowContext(), anyIntIterable(), WindowOperatorContractTest.<Void>anyCollector());
+ verify(mockWindowFunction, times(1)).process(eq(0), eq(new TimeWindow(0, 2)), anyInternalWindowContext(), intIterable(0), WindowOperatorContractTest.<Void>anyCollector());
+ verify(mockWindowFunction, times(1)).process(eq(0), eq(new TimeWindow(2, 4)), anyInternalWindowContext(), intIterable(0), WindowOperatorContractTest.<Void>anyCollector());
// clear is only called at cleanup time/GC time
verify(mockTrigger, never()).clear(anyTimeWindow(), anyTriggerContext());
@@ -919,9 +923,9 @@ public abstract class WindowOperatorContractTest extends TestLogger {
timeAdaptor.advanceTime(testHarness, 0L);
- verify(mockWindowFunction, times(2)).apply(eq(0), anyTimeWindow(), anyIntIterable(), WindowOperatorContractTest.<Void>anyCollector());
- verify(mockWindowFunction, times(1)).apply(eq(0), eq(new TimeWindow(0, 2)), intIterable(0), WindowOperatorContractTest.<Void>anyCollector());
- verify(mockWindowFunction, times(1)).apply(eq(0), eq(new TimeWindow(2, 4)), intIterable(0), WindowOperatorContractTest.<Void>anyCollector());
+ verify(mockWindowFunction, times(2)).process(eq(0), anyTimeWindow(), anyInternalWindowContext(), anyIntIterable(), WindowOperatorContractTest.<Void>anyCollector());
+ verify(mockWindowFunction, times(1)).process(eq(0), eq(new TimeWindow(0, 2)), anyInternalWindowContext(), intIterable(0), WindowOperatorContractTest.<Void>anyCollector());
+ verify(mockWindowFunction, times(1)).process(eq(0), eq(new TimeWindow(2, 4)), anyInternalWindowContext(), intIterable(0), WindowOperatorContractTest.<Void>anyCollector());
// clear is only called at cleanup time/GC time
verify(mockTrigger, never()).clear(anyTimeWindow(), anyTriggerContext());
@@ -1050,7 +1054,7 @@ public abstract class WindowOperatorContractTest extends TestLogger {
timeAdaptor.verifyTriggerCallback(mockTrigger, never(), null, null);
verify(mockWindowFunction, never())
- .apply(anyInt(), anyTimeWindow(), anyIntIterable(), WindowOperatorContractTest.<List<Integer>>anyCollector());
+ .process(anyInt(), anyTimeWindow(), anyInternalWindowContext(), anyIntIterable(), WindowOperatorContractTest.<List<Integer>>anyCollector());
assertEquals(1, timeAdaptor.numTimers(testHarness)); // only gc timers left
}
@@ -1114,7 +1118,7 @@ public abstract class WindowOperatorContractTest extends TestLogger {
timeAdaptor.verifyTriggerCallback(mockTrigger, never(), null, null);
verify(mockWindowFunction, never())
- .apply(anyInt(), anyTimeWindow(), anyIntIterable(), WindowOperatorContractTest.<List<Integer>>anyCollector());
+ .process(anyInt(), anyTimeWindow(), anyInternalWindowContext(), anyIntIterable(), WindowOperatorContractTest.<List<Integer>>anyCollector());
assertEquals(1, timeAdaptor.numTimers(testHarness)); // only gc timers left
}
@@ -1186,7 +1190,7 @@ public abstract class WindowOperatorContractTest extends TestLogger {
timeAdaptor.verifyTriggerCallback(mockTrigger, times(1), null, null);
verify(mockWindowFunction, never())
- .apply(anyInt(), anyTimeWindow(), anyIntIterable(), WindowOperatorContractTest.<List<Integer>>anyCollector());
+ .process(anyInt(), anyTimeWindow(), anyInternalWindowContext(), anyIntIterable(), WindowOperatorContractTest.<List<Integer>>anyCollector());
// now we trigger the dangling timer
timeAdaptor.advanceTime(testHarness, 10L);
@@ -2208,7 +2212,7 @@ public abstract class WindowOperatorContractTest extends TestLogger {
testHarness.processElement(new StreamRecord<>(0, 0L));
- verify(mockWindowFunction, times(1)).apply(eq(0), eq(new TimeWindow(0, 2)), intIterable(0), WindowOperatorContractTest.<Void>anyCollector());
+ verify(mockWindowFunction, times(1)).process(eq(0), eq(new TimeWindow(0, 2)), anyInternalWindowContext(), intIterable(0), WindowOperatorContractTest.<Void>anyCollector());
// clear is only called at cleanup time/GC time
verify(mockTrigger, never()).clear(anyTimeWindow(), anyTriggerContext());
@@ -2326,9 +2330,9 @@ public abstract class WindowOperatorContractTest extends TestLogger {
verify(mockTrigger, times(2)).clear(anyTimeWindow(), anyTriggerContext());
- verify(mockWindowFunction, times(2)).apply(eq(0), anyTimeWindow(), anyIntIterable(), WindowOperatorContractTest.<Void>anyCollector());
- verify(mockWindowFunction, times(1)).apply(eq(0), eq(new TimeWindow(0, 2)), intIterable(0), WindowOperatorContractTest.<Void>anyCollector());
- verify(mockWindowFunction, times(1)).apply(eq(0), eq(new TimeWindow(2, 4)), intIterable(0), WindowOperatorContractTest.<Void>anyCollector());
+ verify(mockWindowFunction, times(2)).process(eq(0), anyTimeWindow(), anyInternalWindowContext(), anyIntIterable(), WindowOperatorContractTest.<Void>anyCollector());
+ verify(mockWindowFunction, times(1)).process(eq(0), eq(new TimeWindow(0, 2)), anyInternalWindowContext(), intIterable(0), WindowOperatorContractTest.<Void>anyCollector());
+ verify(mockWindowFunction, times(1)).process(eq(0), eq(new TimeWindow(2, 4)), anyInternalWindowContext(), intIterable(0), WindowOperatorContractTest.<Void>anyCollector());
// it's also called for the cleanup timers
verify(mockTrigger, times(4)).onEventTime(anyLong(), anyTimeWindow(), anyTriggerContext());
@@ -2339,6 +2343,96 @@ public abstract class WindowOperatorContractTest extends TestLogger {
assertEquals(0, testHarness.numEventTimeTimers());
}
+ @Test
+ public void testPerWindowStateSetAndClearedOnEventTimePurge() throws Exception {
+ testPerWindowStateSetAndClearedOnPurge(new EventTimeAdaptor());
+ }
+
+ @Test
+ public void testPerWindowStateSetAndClearedOnProcessingTimePurge() throws Exception {
+ testPerWindowStateSetAndClearedOnPurge(new ProcessingTimeAdaptor());
+ }
+
+ public void testPerWindowStateSetAndClearedOnPurge(TimeDomainAdaptor timeAdaptor) throws Exception {
+ WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner();
+ timeAdaptor.setIsEventTime(mockAssigner);
+ Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
+ InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
+
+ KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness =
+ createWindowOperator(mockAssigner, mockTrigger, 20L, mockWindowFunction);
+
+ testHarness.open();
+
+ when(mockTrigger.onElement(anyInt(), anyLong(), anyTimeWindow(), anyTriggerContext()))
+ .thenReturn(TriggerResult.FIRE);
+
+ when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext()))
+ .thenReturn(Arrays.asList(new TimeWindow(0, 20)));
+
+ doAnswer(new Answer<Object>() {
+ @Override
+ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+ InternalWindowFunction.InternalWindowContext context = (InternalWindowFunction.InternalWindowContext)invocationOnMock.getArguments()[2];
+ context.windowState().getState(valueStateDescriptor).update("hello");
+ return null;
+ }
+ }).when(mockWindowFunction).process(anyInt(), anyTimeWindow(), anyInternalWindowContext(), anyIntIterable(), WindowOperatorContractTest.<Void>anyCollector());
+
+ doAnswer(new Answer<Object>() {
+ @Override
+ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+ InternalWindowFunction.InternalWindowContext context = (InternalWindowFunction.InternalWindowContext)invocationOnMock.getArguments()[1];
+ context.windowState().getState(valueStateDescriptor).clear();
+ return null;
+ }
+ }).when(mockWindowFunction).clear(anyTimeWindow(), anyInternalWindowContext());
+
+ assertEquals(0, testHarness.getOutput().size());
+ assertEquals(0, testHarness.numKeyedStateEntries());
+
+ testHarness.processElement(new StreamRecord<>(0, 0L));
+
+ assertEquals(2, testHarness.numKeyedStateEntries()); // window contents plus value state
+ assertEquals(1, timeAdaptor.numTimers(testHarness)); // gc timers
+
+ timeAdaptor.advanceTime(testHarness, 19 + 20); // 19 is maxTime of the window
+
+ assertEquals(0, testHarness.numKeyedStateEntries());
+ assertEquals(0, timeAdaptor.numTimers(testHarness));
+ }
+
+ @Test
+ public void testWindowStateNotAvailableToMergingWindows() throws Exception {
+ WindowAssigner<Integer, TimeWindow> mockAssigner = mockMergingAssigner();
+ Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
+ InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
+
+ KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness =
+ createWindowOperator(mockAssigner, mockTrigger, 20L, mockWindowFunction);
+
+ testHarness.open();
+
+ when(mockTrigger.onElement(anyInt(), anyLong(), anyTimeWindow(), anyTriggerContext()))
+ .thenReturn(TriggerResult.FIRE);
+
+ when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext()))
+ .thenReturn(Arrays.asList(new TimeWindow(0, 20)));
+
+ doAnswer(new Answer<Object>() {
+ @Override
+ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+ InternalWindowFunction.InternalWindowContext context = (InternalWindowFunction.InternalWindowContext)invocationOnMock.getArguments()[2];
+ context.windowState().getState(valueStateDescriptor).update("hello");
+ return null;
+ }
+ }).when(mockWindowFunction).process(anyInt(), anyTimeWindow(), anyInternalWindowContext(), anyIntIterable(), WindowOperatorContractTest.<Void>anyCollector());
+
+ expectedException.expect(UnsupportedOperationException.class);
+ expectedException.expectMessage("Per-window state is not allowed when using merging windows.");
+ testHarness.processElement(new StreamRecord<>(0, 0L));
+ }
+
protected abstract <W extends Window, OUT> KeyedOneInputStreamOperatorTestHarness<Integer, Integer, OUT> createWindowOperator(
WindowAssigner<Integer, W> assigner,
Trigger<Integer, W> trigger,
http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessAllWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessAllWindowFunction.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessAllWindowFunction.scala
index 163117b..2f0e48e 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessAllWindowFunction.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessAllWindowFunction.scala
@@ -22,6 +22,7 @@ import java.io.Serializable
import org.apache.flink.annotation.PublicEvolving
import org.apache.flink.api.common.functions.Function
+import org.apache.flink.api.common.state.KeyedStateStore
import org.apache.flink.streaming.api.windowing.windows.Window
import org.apache.flink.util.Collector
@@ -47,6 +48,15 @@ abstract class ProcessAllWindowFunction[IN, OUT, W <: Window] extends Function w
def process(context: Context, elements: Iterable[IN], out: Collector[OUT])
/**
+ * Deletes any state in the [[Context]] when the Window is purged.
+ *
+ * @param context The context to which the window is being evaluated
+ * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
+ */
+ @throws[Exception]
+ def clear(context: Context) {}
+
+ /**
* The context holding window metadata
*/
abstract class Context {
@@ -54,6 +64,16 @@ abstract class ProcessAllWindowFunction[IN, OUT, W <: Window] extends Function w
* @return The window that is being evaluated.
*/
def window: W
+
+ /**
+ * State accessor for per-key and per-window state.
+ */
+ def windowState: KeyedStateStore
+
+ /**
+ * State accessor for per-key global state.
+ */
+ def globalState: KeyedStateStore
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessWindowFunction.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessWindowFunction.scala
index 79f3918..bdf6ae6 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessWindowFunction.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessWindowFunction.scala
@@ -22,6 +22,7 @@ import java.io.Serializable
import org.apache.flink.annotation.PublicEvolving
import org.apache.flink.api.common.functions.Function
+import org.apache.flink.api.common.state.KeyedStateStore
import org.apache.flink.streaming.api.windowing.windows.Window
import org.apache.flink.util.Collector
@@ -49,6 +50,15 @@ abstract class ProcessWindowFunction[IN, OUT, KEY, W <: Window] extends Function
def process(key: KEY, context: Context, elements: Iterable[IN], out: Collector[OUT])
/**
+ * Deletes any state in the [[Context]] when the Window is purged.
+ *
+ * @param context The context to which the window is being evaluated
+ * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
+ */
+ @throws[Exception]
+ def clear(context: Context) {}
+
+ /**
* The context holding window metadata
*/
abstract class Context {
@@ -56,6 +66,16 @@ abstract class ProcessWindowFunction[IN, OUT, KEY, W <: Window] extends Function
* @return The window that is being evaluated.
*/
def window: W
+
+ /**
+ * State accessor for per-key and per-window state.
+ */
+ def windowState: KeyedStateStore
+
+ /**
+ * State accessor for per-key global state.
+ */
+ def globalState: KeyedStateStore
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala
index a4fec64..fac5958 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala
@@ -52,10 +52,25 @@ final class ScalaProcessWindowFunctionWrapper[IN, OUT, KEY, W <: Window](
out: Collector[OUT]): Unit = {
val ctx = new func.Context {
override def window = context.window
+
+ override def windowState = context.windowState()
+
+ override def globalState = context.globalState()
}
func.process(key, ctx, elements.asScala, out)
}
+ override def clear(context: JProcessWindowFunction[IN, OUT, KEY, W]#Context): Unit = {
+ val ctx = new func.Context {
+ override def window = context.window
+
+ override def windowState = context.windowState()
+
+ override def globalState = context.globalState()
+ }
+ func.clear(ctx)
+ }
+
override def setRuntimeContext(t: RuntimeContext): Unit = {
super.setRuntimeContext(t)
func match {
@@ -99,10 +114,26 @@ final class ScalaProcessAllWindowFunctionWrapper[IN, OUT, W <: Window](
out: Collector[OUT]): Unit = {
val ctx = new func.Context {
override def window = context.window
+
+ override def windowState = context.windowState()
+
+ override def globalState = context.globalState()
}
func.process(ctx, elements.asScala, out)
}
+ override def clear(context: JProcessAllWindowFunction[IN, OUT, W]#Context): Unit = {
+ val ctx = new func.Context {
+ override def window = context.window
+
+ override def windowState = context.windowState()
+
+ override def globalState = context.globalState()
+ }
+ func.clear(ctx)
+ }
+
+
override def setRuntimeContext(t: RuntimeContext): Unit = {
super.setRuntimeContext(t)
func match {
[2/3] flink git commit: [FLINK-5929] Allow Access to Per-Window State
in ProcessWindowFunction
Posted by al...@apache.org.
[FLINK-5929] Allow Access to Per-Window State in ProcessWindowFunction
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fad201bf
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fad201bf
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fad201bf
Branch: refs/heads/master
Commit: fad201bfb0b1f2757f68f7b3ffaf97a486eb93e8
Parents: 5c37e55
Author: Seth Wiesman <sw...@mediamath.com>
Authored: Sun Mar 5 23:07:18 2017 -0500
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Sat Mar 25 16:59:17 2017 +0100
----------------------------------------------------------------------
.../FoldApplyProcessAllWindowFunction.java | 23 +-
.../FoldApplyProcessWindowFunction.java | 23 +-
.../InternalProcessApplyAllWindowContext.java | 57 +++++
.../InternalProcessApplyWindowContext.java | 58 +++++
.../windowing/ProcessAllWindowFunction.java | 22 ++
.../windowing/ProcessWindowFunction.java | 24 +-
.../ReduceApplyProcessAllWindowFunction.java | 23 +-
.../ReduceApplyProcessWindowFunction.java | 21 +-
.../windowing/AccumulatingKeyedTimePanes.java | 75 ++++++-
.../windowing/EvictingWindowOperator.java | 62 +++---
.../operators/windowing/WindowOperator.java | 220 +++++++++++++++----
...ternalAggregateProcessAllWindowFunction.java | 28 ++-
.../InternalAggregateProcessWindowFunction.java | 28 ++-
.../InternalIterableAllWindowFunction.java | 7 +-
...nternalIterableProcessAllWindowFunction.java | 31 ++-
.../InternalIterableProcessWindowFunction.java | 24 +-
.../InternalIterableWindowFunction.java | 7 +-
.../InternalProcessAllWindowContext.java | 57 +++++
.../functions/InternalProcessWindowContext.java | 58 +++++
.../InternalSingleValueAllWindowFunction.java | 7 +-
...rnalSingleValueProcessAllWindowFunction.java | 29 ++-
...nternalSingleValueProcessWindowFunction.java | 24 +-
.../InternalSingleValueWindowFunction.java | 7 +-
.../functions/InternalWindowFunction.java | 26 ++-
.../FoldApplyProcessWindowFunctionTest.java | 82 ++++++-
.../functions/InternalWindowFunctionTest.java | 49 +++--
.../RegularWindowOperatorContractTest.java | 12 +-
.../windowing/WindowOperatorContractTest.java | 158 ++++++++++---
.../function/ProcessAllWindowFunction.scala | 20 ++
.../scala/function/ProcessWindowFunction.scala | 20 ++
.../ScalaProcessWindowFunctionWrapper.scala | 31 +++
31 files changed, 1091 insertions(+), 222 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessAllWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessAllWindowFunction.java
index 5ac6766..8e8e52c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessAllWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessAllWindowFunction.java
@@ -50,6 +50,7 @@ public class FoldApplyProcessAllWindowFunction<W extends Window, T, ACC, R>
private TypeSerializer<ACC> accSerializer;
private final TypeInformation<ACC> accTypeInformation;
private transient ACC initialValue;
+ private transient InternalProcessApplyAllWindowContext<ACC, R, W> ctx;
public FoldApplyProcessAllWindowFunction(ACC initialValue, FoldFunction<T, ACC> foldFunction, ProcessAllWindowFunction<ACC, R, W> windowFunction, TypeInformation<ACC> accTypeInformation) {
this.windowFunction = windowFunction;
@@ -70,6 +71,9 @@ public class FoldApplyProcessAllWindowFunction<W extends Window, T, ACC, R>
ByteArrayInputStream bais = new ByteArrayInputStream(serializedInitialValue);
DataInputViewStreamWrapper in = new DataInputViewStreamWrapper(bais);
initialValue = accSerializer.deserialize(in);
+
+ ctx = new InternalProcessApplyAllWindowContext<>(windowFunction);
+
}
@Override
@@ -92,12 +96,19 @@ public class FoldApplyProcessAllWindowFunction<W extends Window, T, ACC, R>
result = foldFunction.fold(result, val);
}
- windowFunction.process(windowFunction.new Context() {
- @Override
- public W window() {
- return context.window();
- }
- }, Collections.singletonList(result), out);
+ this.ctx.window = context.window();
+ this.ctx.windowState = context.windowState();
+ this.ctx.globalState = context.globalState();
+
+ windowFunction.process(ctx, Collections.singletonList(result), out);
+ }
+
+ @Override
+ public void clear(final Context context) throws Exception {
+ this.ctx.window = context.window();
+ this.ctx.windowState = context.windowState();
+ this.ctx.globalState = context.globalState();
+ windowFunction.clear(ctx);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessWindowFunction.java
index e1bc759..073a2f3 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessWindowFunction.java
@@ -50,6 +50,7 @@ public class FoldApplyProcessWindowFunction<K, W extends Window, T, ACC, R>
private TypeSerializer<ACC> accSerializer;
private final TypeInformation<ACC> accTypeInformation;
private transient ACC initialValue;
+ private transient InternalProcessApplyWindowContext<ACC, R, K, W> ctx;
public FoldApplyProcessWindowFunction(ACC initialValue, FoldFunction<T, ACC> foldFunction, ProcessWindowFunction<ACC, R, K, W> windowFunction, TypeInformation<ACC> accTypeInformation) {
this.windowFunction = windowFunction;
@@ -70,6 +71,8 @@ public class FoldApplyProcessWindowFunction<K, W extends Window, T, ACC, R>
ByteArrayInputStream bais = new ByteArrayInputStream(serializedInitialValue);
DataInputViewStreamWrapper in = new DataInputViewStreamWrapper(bais);
initialValue = accSerializer.deserialize(in);
+
+ ctx = new InternalProcessApplyWindowContext<>(windowFunction);
}
@Override
@@ -85,19 +88,25 @@ public class FoldApplyProcessWindowFunction<K, W extends Window, T, ACC, R>
}
@Override
- public void process(K key, final Context context, Iterable<T> values, Collector<R> out) throws Exception {
+ public void process(K key, Context context, Iterable<T> values, Collector<R> out) throws Exception {
ACC result = accSerializer.copy(initialValue);
for (T val : values) {
result = foldFunction.fold(result, val);
}
- windowFunction.process(key, windowFunction.new Context() {
- @Override
- public W window() {
- return context.window();
- }
- }, Collections.singletonList(result), out);
+ this.ctx.window = context.window();
+ this.ctx.windowState = context.windowState();
+ this.ctx.globalState = context.globalState();
+ windowFunction.process(key, ctx, Collections.singletonList(result), out);
+ }
+
+ @Override
+ public void clear(final Context context) throws Exception{
+ this.ctx.window = context.window();
+ this.ctx.windowState = context.windowState();
+ this.ctx.globalState = context.globalState();
+ windowFunction.clear(ctx);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/InternalProcessApplyAllWindowContext.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/InternalProcessApplyAllWindowContext.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/InternalProcessApplyAllWindowContext.java
new file mode 100644
index 0000000..e1a0a98
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/InternalProcessApplyAllWindowContext.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.windowing;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.state.KeyedStateStore;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+
+/**
+ * Internal reusable context wrapper.
+ *
+ * @param <IN> The type of the input value.
+ * @param <OUT> The type of the output value.
+ * @param <W> The type of the window.
+ */
+@Internal
+public class InternalProcessApplyAllWindowContext<IN, OUT, W extends Window>
+ extends ProcessAllWindowFunction<IN, OUT, W>.Context {
+
+ W window;
+ KeyedStateStore windowState;
+ KeyedStateStore globalState;
+
+ InternalProcessApplyAllWindowContext(ProcessAllWindowFunction<IN, OUT, W> function) {
+ function.super();
+ }
+
+ @Override
+ public W window() {
+ return window;
+ }
+
+ @Override
+ public KeyedStateStore windowState() {
+ return windowState;
+ }
+
+ @Override
+ public KeyedStateStore globalState() {
+ return globalState;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/InternalProcessApplyWindowContext.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/InternalProcessApplyWindowContext.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/InternalProcessApplyWindowContext.java
new file mode 100644
index 0000000..f547adc
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/InternalProcessApplyWindowContext.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.windowing;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.state.KeyedStateStore;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+
+/**
+ * Internal reusable context wrapper.
+ *
+ * @param <IN> The type of the input value.
+ * @param <OUT> The type of the output value.
+ * @param <KEY> The type of the window key.
+ * @param <W> The type of the window.
+ */
+@Internal
+public class InternalProcessApplyWindowContext<IN, OUT, KEY, W extends Window>
+ extends ProcessWindowFunction<IN, OUT, KEY, W>.Context {
+
+ W window;
+ KeyedStateStore windowState;
+ KeyedStateStore globalState;
+
+ InternalProcessApplyWindowContext(ProcessWindowFunction<IN, OUT, KEY, W> function) {
+ function.super();
+ }
+
+ @Override
+ public W window() {
+ return window;
+ }
+
+ @Override
+ public KeyedStateStore windowState() {
+ return windowState;
+ }
+
+ @Override
+ public KeyedStateStore globalState() {
+ return globalState;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessAllWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessAllWindowFunction.java
index 622e020..f49aa27 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessAllWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessAllWindowFunction.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.functions.windowing;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.state.KeyedStateStore;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.util.Collector;
@@ -48,6 +49,14 @@ public abstract class ProcessAllWindowFunction<IN, OUT, W extends Window> implem
public abstract void process(Context context, Iterable<IN> elements, Collector<OUT> out) throws Exception;
/**
+ * Deletes any state in the {@code Context} when the Window is purged.
+ *
+ * @param context The context to which the window is being evaluated
+ * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
+ */
+ public void clear(Context context) throws Exception {}
+
+ /**
* The context holding window metadata
*/
public abstract class Context {
@@ -55,5 +64,18 @@ public abstract class ProcessAllWindowFunction<IN, OUT, W extends Window> implem
* @return The window that is being evaluated.
*/
public abstract W window();
+
+ /**
+ * State accessor for per-key and per-window state.
+ *
+ * <p><b>NOTE:</b>If you use per-window state you have to ensure that you clean it up
+ * by implementing {@link ProcessWindowFunction#clear(ProcessWindowFunction.Context)}.
+ */
+ public abstract KeyedStateStore windowState();
+
+ /**
+ * State accessor for per-key global state.
+ */
+ public abstract KeyedStateStore globalState();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.java
index 9c48e24..bcefaf7 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.functions.windowing;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.state.KeyedStateStore;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.util.Collector;
@@ -50,12 +51,33 @@ public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> impl
public abstract void process(KEY key, Context context, Iterable<IN> elements, Collector<OUT> out) throws Exception;
/**
+ * Deletes any state in the {@code Context} when the Window is purged.
+ *
+ * @param context The context to which the window is being evaluated
+ * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
+ */
+ public void clear(Context context) throws Exception {}
+
+ /**
* The context holding window metadata
*/
- public abstract class Context {
+ public abstract class Context implements java.io.Serializable {
/**
* @return The window that is being evaluated.
*/
public abstract W window();
+
+ /**
+ * State accessor for per-key and per-window state.
+ *
+ * <p><b>NOTE:</b>If you use per-window state you have to ensure that you clean it up
+ * by implementing {@link ProcessWindowFunction#clear(Context)}.
+ */
+ public abstract KeyedStateStore windowState();
+
+ /**
+ * State accessor for per-key global state.
+ */
+ public abstract KeyedStateStore globalState();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessAllWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessAllWindowFunction.java
index 142c71e..4c54c94 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessAllWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessAllWindowFunction.java
@@ -35,6 +35,7 @@ public class ReduceApplyProcessAllWindowFunction<W extends Window, T, R>
private final ReduceFunction<T> reduceFunction;
private final ProcessAllWindowFunction<T, R, W> windowFunction;
+ private transient InternalProcessApplyAllWindowContext<T, R, W> ctx;
public ReduceApplyProcessAllWindowFunction(ReduceFunction<T> reduceFunction, ProcessAllWindowFunction<T, R, W> windowFunction) {
this.windowFunction = windowFunction;
@@ -52,17 +53,27 @@ public class ReduceApplyProcessAllWindowFunction<W extends Window, T, R>
curr = reduceFunction.reduce(curr, val);
}
}
- windowFunction.process(windowFunction.new Context() {
- @Override
- public W window() {
- return context.window();
- }
- }, Collections.singletonList(curr), out);
+
+ this.ctx.window = context.window();
+ this.ctx.windowState = context.windowState();
+ this.ctx.globalState = context.globalState();
+
+ windowFunction.process(ctx, Collections.singletonList(curr), out);
+ }
+
+ @Override
+ public void clear(final Context context) throws Exception {
+ this.ctx.window = context.window();
+ this.ctx.windowState = context.windowState();
+ this.ctx.globalState = context.globalState();
+
+ windowFunction.clear(ctx);
}
@Override
public void open(Configuration configuration) throws Exception {
FunctionUtils.openFunction(this.windowFunction, configuration);
+ ctx = new InternalProcessApplyAllWindowContext<>(windowFunction);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessWindowFunction.java
index 9ea1fdf..1af783a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessWindowFunction.java
@@ -35,6 +35,7 @@ public class ReduceApplyProcessWindowFunction<K, W extends Window, T, R>
private final ReduceFunction<T> reduceFunction;
private final ProcessWindowFunction<T, R, K, W> windowFunction;
+ private transient InternalProcessApplyWindowContext<T, R, K, W> ctx;
public ReduceApplyProcessWindowFunction(ReduceFunction<T> reduceFunction, ProcessWindowFunction<T, R, K, W> windowFunction) {
this.windowFunction = windowFunction;
@@ -52,17 +53,25 @@ public class ReduceApplyProcessWindowFunction<K, W extends Window, T, R>
curr = reduceFunction.reduce(curr, val);
}
}
- windowFunction.process(k, windowFunction.new Context() {
- @Override
- public W window() {
- return context.window();
- }
- }, Collections.singletonList(curr), out);
+
+ this.ctx.window = context.window();
+ this.ctx.windowState = context.windowState();
+ this.ctx.globalState = context.globalState();
+ windowFunction.process(k, ctx, Collections.singletonList(curr), out);
+ }
+
+ @Override
+ public void clear(final Context context) throws Exception {
+ this.ctx.window = context.window();
+ this.ctx.windowState = context.windowState();
+ this.ctx.globalState = context.globalState();
+ windowFunction.clear(ctx);
}
@Override
public void open(Configuration configuration) throws Exception {
FunctionUtils.openFunction(this.windowFunction, configuration);
+ ctx = new InternalProcessApplyWindowContext<>(windowFunction);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java
index 87c5aca..d58b5cc 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java
@@ -19,6 +19,17 @@
package org.apache.flink.streaming.runtime.operators.windowing;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.state.FoldingState;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.KeyedStateStore;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingState;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction;
import org.apache.flink.util.UnionIterator;
@@ -38,6 +49,8 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> extends AbstractKeyed
private final InternalWindowFunction<Iterable<Type>, Result, Key, Window> function;
+ private final AccumulatingKeyedTimePanesContext context;
+
/**
* IMPORTANT: This value needs to start at one, so it is fresher than the value that new entries have (zero) */
private long evaluationPass = 1L;
@@ -47,6 +60,7 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> extends AbstractKeyed
public AccumulatingKeyedTimePanes(KeySelector<Type, Key> keySelector, InternalWindowFunction<Iterable<Type>, Result, Key, Window> function) {
this.keySelector = keySelector;
this.function = function;
+ this.context = new AccumulatingKeyedTimePanesContext();
}
// ------------------------------------------------------------------------
@@ -67,13 +81,15 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> extends AbstractKeyed
for (KeyMap.Entry<Key, ArrayList<Type>> entry : latestPane) {
Key key = entry.getKey();
operator.setCurrentKey(key);
- function.apply(entry.getKey(), window, entry.getValue(), out);
+ context.globalState = operator.getKeyedStateStore();
+
+ function.process(entry.getKey(), window, context, entry.getValue(), out);
}
}
else {
// general code path for multi-pane case
WindowFunctionTraversal<Key, Type, Result> evaluator = new WindowFunctionTraversal<>(
- function, window, out, operator);
+ function, window, out, operator, context);
traverseAllPanes(evaluator, evaluationPass);
}
@@ -95,17 +111,19 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> extends AbstractKeyed
private final TimeWindow window;
private final AbstractStreamOperator<Result> contextOperator;
-
+
private Key currentKey;
+ private AccumulatingKeyedTimePanesContext context;
WindowFunctionTraversal(InternalWindowFunction<Iterable<Type>, Result, Key, Window> function, TimeWindow window,
- Collector<Result> out, AbstractStreamOperator<Result> contextOperator) {
+ Collector<Result> out, AbstractStreamOperator<Result> contextOperator, AccumulatingKeyedTimePanesContext context) {
this.function = function;
this.out = out;
this.unionIterator = new UnionIterator<>();
this.window = window;
this.contextOperator = contextOperator;
+ this.context = context;
}
@@ -123,7 +141,8 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> extends AbstractKeyed
@Override
public void keyDone() throws Exception {
contextOperator.setCurrentKey(currentKey);
- function.apply(currentKey, window, unionIterator, out);
+ context.globalState = contextOperator.getKeyedStateStore();
+ function.process(currentKey, window, context, unionIterator, out);
}
}
@@ -136,6 +155,52 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> extends AbstractKeyed
return (KeyMap.LazyFactory<ArrayList<V>>) LIST_FACTORY;
}
+ private static class ThrowingKeyedStateStore implements KeyedStateStore {
+ @Override
+ public <T> ValueState<T> getState(ValueStateDescriptor<T> stateProperties) {
+ throw new UnsupportedOperationException("Per-window state is not supported when using aligned processing-time windows.");
+ }
+
+ @Override
+ public <T> ListState<T> getListState(ListStateDescriptor<T> stateProperties) {
+ throw new UnsupportedOperationException("Per-window state is not supported when using aligned processing-time windows.");
+ }
+
+ @Override
+ public <T> ReducingState<T> getReducingState(ReducingStateDescriptor<T> stateProperties) {
+ throw new UnsupportedOperationException("Per-window state is not supported when using aligned processing-time windows.");
+ }
+
+ @Override
+ public <T, A> FoldingState<T, A> getFoldingState(FoldingStateDescriptor<T, A> stateProperties) {
+ throw new UnsupportedOperationException("Per-window state is not supported when using aligned processing-time windows.");
+ }
+
+ @Override
+ public <UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> stateProperties) {
+ throw new UnsupportedOperationException("Per-window state is not supported when using aligned processing-time windows.");
+ }
+ }
+
+ private static class AccumulatingKeyedTimePanesContext implements InternalWindowFunction.InternalWindowContext {
+ KeyedStateStore globalState;
+ KeyedStateStore throwingStore;
+
+ public AccumulatingKeyedTimePanesContext() {
+ this.throwingStore = new ThrowingKeyedStateStore();
+ }
+
+ @Override
+ public KeyedStateStore windowState() {
+ return throwingStore;
+ }
+
+ @Override
+ public KeyedStateStore globalState() {
+ return globalState;
+ }
+ }
+
private static final KeyMap.LazyFactory<?> LIST_FACTORY = new KeyMap.LazyFactory<ArrayList<?>>() {
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
index 24c8d32..85451a5 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
@@ -134,14 +134,14 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window>
" window: " + mergeResult);
}
- context.key = key;
- context.window = mergeResult;
+ triggerContext.key = key;
+ triggerContext.window = mergeResult;
- context.onMerge(mergedWindows);
+ triggerContext.onMerge(mergedWindows);
for (W m : mergedWindows) {
- context.window = m;
- context.clear();
+ triggerContext.window = m;
+ triggerContext.clear();
deleteCleanupTimer(m);
}
@@ -165,12 +165,12 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window>
evictingWindowState.setCurrentNamespace(stateWindow);
evictingWindowState.add(element);
- context.key = key;
- context.window = actualWindow;
+ triggerContext.key = key;
+ triggerContext.window = actualWindow;
evictorContext.key = key;
evictorContext.window = actualWindow;
- TriggerResult triggerResult = context.onElement(element);
+ TriggerResult triggerResult = triggerContext.onElement(element);
if (triggerResult.isFire()) {
Iterable<StreamRecord<IN>> contents = evictingWindowState.get();
@@ -201,12 +201,12 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window>
evictingWindowState.setCurrentNamespace(window);
evictingWindowState.add(element);
- context.key = key;
- context.window = window;
+ triggerContext.key = key;
+ triggerContext.window = window;
evictorContext.key = key;
evictorContext.window = window;
- TriggerResult triggerResult = context.onElement(element);
+ TriggerResult triggerResult = triggerContext.onElement(element);
if (triggerResult.isFire()) {
Iterable<StreamRecord<IN>> contents = evictingWindowState.get();
@@ -236,8 +236,8 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window>
@Override
public void onEventTime(InternalTimer<K, W> timer) throws Exception {
- context.key = timer.getKey();
- context.window = timer.getNamespace();
+ triggerContext.key = timer.getKey();
+ triggerContext.window = timer.getNamespace();
evictorContext.key = timer.getKey();
evictorContext.window = timer.getNamespace();
@@ -245,7 +245,7 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window>
if (windowAssigner instanceof MergingWindowAssigner) {
mergingWindows = getMergingWindowSet();
- W stateWindow = mergingWindows.getStateWindow(context.window);
+ W stateWindow = mergingWindows.getStateWindow(triggerContext.window);
if (stateWindow == null) {
// Timer firing for non-existent window, this can only happen if a
// trigger did not clean up timers. We have already cleared the merging
@@ -255,23 +255,23 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window>
evictingWindowState.setCurrentNamespace(stateWindow);
}
} else {
- evictingWindowState.setCurrentNamespace(context.window);
+ evictingWindowState.setCurrentNamespace(triggerContext.window);
}
Iterable<StreamRecord<IN>> contents = evictingWindowState.get();
if (contents != null) {
- TriggerResult triggerResult = context.onEventTime(timer.getTimestamp());
+ TriggerResult triggerResult = triggerContext.onEventTime(timer.getTimestamp());
if (triggerResult.isFire()) {
- emitWindowContents(context.window, contents, evictingWindowState);
+ emitWindowContents(triggerContext.window, contents, evictingWindowState);
}
if (triggerResult.isPurge()) {
evictingWindowState.clear();
}
}
- if (windowAssigner.isEventTime() && isCleanupTime(context.window, timer.getTimestamp())) {
- clearAllState(context.window, evictingWindowState, mergingWindows);
+ if (windowAssigner.isEventTime() && isCleanupTime(triggerContext.window, timer.getTimestamp())) {
+ clearAllState(triggerContext.window, evictingWindowState, mergingWindows);
}
if (mergingWindows != null) {
@@ -282,8 +282,8 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window>
@Override
public void onProcessingTime(InternalTimer<K, W> timer) throws Exception {
- context.key = timer.getKey();
- context.window = timer.getNamespace();
+ triggerContext.key = timer.getKey();
+ triggerContext.window = timer.getNamespace();
evictorContext.key = timer.getKey();
evictorContext.window = timer.getNamespace();
@@ -291,7 +291,7 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window>
if (windowAssigner instanceof MergingWindowAssigner) {
mergingWindows = getMergingWindowSet();
- W stateWindow = mergingWindows.getStateWindow(context.window);
+ W stateWindow = mergingWindows.getStateWindow(triggerContext.window);
if (stateWindow == null) {
// Timer firing for non-existent window, this can only happen if a
// trigger did not clean up timers. We have already cleared the merging
@@ -301,23 +301,23 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window>
evictingWindowState.setCurrentNamespace(stateWindow);
}
} else {
- evictingWindowState.setCurrentNamespace(context.window);
+ evictingWindowState.setCurrentNamespace(triggerContext.window);
}
Iterable<StreamRecord<IN>> contents = evictingWindowState.get();
if (contents != null) {
- TriggerResult triggerResult = context.onProcessingTime(timer.getTimestamp());
+ TriggerResult triggerResult = triggerContext.onProcessingTime(timer.getTimestamp());
if (triggerResult.isFire()) {
- emitWindowContents(context.window, contents, evictingWindowState);
+ emitWindowContents(triggerContext.window, contents, evictingWindowState);
}
if (triggerResult.isPurge()) {
evictingWindowState.clear();
}
}
- if (!windowAssigner.isEventTime() && isCleanupTime(context.window, timer.getTimestamp())) {
- clearAllState(context.window, evictingWindowState, mergingWindows);
+ if (!windowAssigner.isEventTime() && isCleanupTime(triggerContext.window, timer.getTimestamp())) {
+ clearAllState(triggerContext.window, evictingWindowState, mergingWindows);
}
if (mergingWindows != null) {
@@ -348,7 +348,8 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window>
}
});
- userFunction.apply(context.key, context.window, projectedContents, timestampedCollector);
+ processContext.window = triggerContext.window;
+ userFunction.process(triggerContext.key, triggerContext.window, processContext, projectedContents, timestampedCollector);
evictorContext.evictAfter(recordsWithTimestamp, Iterables.size(recordsWithTimestamp));
@@ -364,9 +365,10 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window>
W window,
ListState<StreamRecord<IN>> windowState,
MergingWindowSet<W> mergingWindows) throws Exception {
-
windowState.clear();
- context.clear();
+ triggerContext.clear();
+ processContext.window = window;
+ processContext.clear();
if (mergingWindows != null) {
mergingWindows.retireWindow(window);
mergingWindows.persist();
http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index 3745659..3d40716 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -23,8 +23,16 @@ import org.apache.commons.math3.util.ArithmeticUtils;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.state.AppendingState;
+import org.apache.flink.api.common.state.FoldingState;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.KeyedStateStore;
+import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.MergingState;
+import org.apache.flink.api.common.state.ReducingState;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueState;
@@ -159,7 +167,9 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
/** This is given to the {@code InternalWindowFunction} for emitting elements with a given timestamp. */
protected transient TimestampedCollector<OUT> timestampedCollector;
- protected transient Context context = new Context(null, null);
+ protected transient Context triggerContext = new Context(null, null);
+
+ protected transient WindowContext processContext = new WindowContext(null);
protected transient WindowAssigner.WindowAssignerContext windowAssignerContext;
@@ -264,7 +274,8 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
internalTimerService =
getInternalTimerService("window-timers", windowSerializer, this);
- context = new Context(null, null);
+ triggerContext = new Context(null, null);
+ processContext = new WindowContext( null);
windowAssignerContext = new WindowAssigner.WindowAssignerContext() {
@Override
@@ -317,7 +328,8 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
public void close() throws Exception {
super.close();
timestampedCollector = null;
- context = null;
+ triggerContext = null;
+ processContext = null;
windowAssignerContext = null;
}
@@ -325,7 +337,8 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
public void dispose() throws Exception {
super.dispose();
timestampedCollector = null;
- context = null;
+ triggerContext = null;
+ processContext = null;
windowAssignerContext = null;
}
@@ -365,14 +378,14 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
" window: " + mergeResult);
}
- context.key = key;
- context.window = mergeResult;
+ triggerContext.key = key;
+ triggerContext.window = mergeResult;
- context.onMerge(mergedWindows);
+ triggerContext.onMerge(mergedWindows);
for (W m: mergedWindows) {
- context.window = m;
- context.clear();
+ triggerContext.window = m;
+ triggerContext.clear();
deleteCleanupTimer(m);
}
@@ -396,10 +409,10 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
windowState.setCurrentNamespace(stateWindow);
windowState.add(element.getValue());
- context.key = key;
- context.window = actualWindow;
+ triggerContext.key = key;
+ triggerContext.window = actualWindow;
- TriggerResult triggerResult = context.onElement(element);
+ TriggerResult triggerResult = triggerContext.onElement(element);
if (triggerResult.isFire()) {
ACC contents = windowState.get();
@@ -429,10 +442,10 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
windowState.setCurrentNamespace(window);
windowState.add(element.getValue());
- context.key = key;
- context.window = window;
+ triggerContext.key = key;
+ triggerContext.window = window;
- TriggerResult triggerResult = context.onElement(element);
+ TriggerResult triggerResult = triggerContext.onElement(element);
if (triggerResult.isFire()) {
ACC contents = windowState.get();
@@ -460,14 +473,14 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
@Override
public void onEventTime(InternalTimer<K, W> timer) throws Exception {
- context.key = timer.getKey();
- context.window = timer.getNamespace();
+ triggerContext.key = timer.getKey();
+ triggerContext.window = timer.getNamespace();
MergingWindowSet<W> mergingWindows;
if (windowAssigner instanceof MergingWindowAssigner) {
mergingWindows = getMergingWindowSet();
- W stateWindow = mergingWindows.getStateWindow(context.window);
+ W stateWindow = mergingWindows.getStateWindow(triggerContext.window);
if (stateWindow == null) {
// Timer firing for non-existent window, this can only happen if a
// trigger did not clean up timers. We have already cleared the merging
@@ -477,7 +490,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
windowState.setCurrentNamespace(stateWindow);
}
} else {
- windowState.setCurrentNamespace(context.window);
+ windowState.setCurrentNamespace(triggerContext.window);
mergingWindows = null;
}
@@ -487,17 +500,17 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
}
if (contents != null) {
- TriggerResult triggerResult = context.onEventTime(timer.getTimestamp());
+ TriggerResult triggerResult = triggerContext.onEventTime(timer.getTimestamp());
if (triggerResult.isFire()) {
- emitWindowContents(context.window, contents);
+ emitWindowContents(triggerContext.window, contents);
}
if (triggerResult.isPurge()) {
windowState.clear();
}
}
- if (windowAssigner.isEventTime() && isCleanupTime(context.window, timer.getTimestamp())) {
- clearAllState(context.window, windowState, mergingWindows);
+ if (windowAssigner.isEventTime() && isCleanupTime(triggerContext.window, timer.getTimestamp())) {
+ clearAllState(triggerContext.window, windowState, mergingWindows);
}
if (mergingWindows != null) {
@@ -508,14 +521,14 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
@Override
public void onProcessingTime(InternalTimer<K, W> timer) throws Exception {
- context.key = timer.getKey();
- context.window = timer.getNamespace();
+ triggerContext.key = timer.getKey();
+ triggerContext.window = timer.getNamespace();
MergingWindowSet<W> mergingWindows;
if (windowAssigner instanceof MergingWindowAssigner) {
mergingWindows = getMergingWindowSet();
- W stateWindow = mergingWindows.getStateWindow(context.window);
+ W stateWindow = mergingWindows.getStateWindow(triggerContext.window);
if (stateWindow == null) {
// Timer firing for non-existent window, this can only happen if a
// trigger did not clean up timers. We have already cleared the merging
@@ -525,7 +538,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
windowState.setCurrentNamespace(stateWindow);
}
} else {
- windowState.setCurrentNamespace(context.window);
+ windowState.setCurrentNamespace(triggerContext.window);
mergingWindows = null;
}
@@ -535,17 +548,17 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
}
if (contents != null) {
- TriggerResult triggerResult = context.onProcessingTime(timer.getTimestamp());
+ TriggerResult triggerResult = triggerContext.onProcessingTime(timer.getTimestamp());
if (triggerResult.isFire()) {
- emitWindowContents(context.window, contents);
+ emitWindowContents(triggerContext.window, contents);
}
if (triggerResult.isPurge()) {
windowState.clear();
}
}
- if (!windowAssigner.isEventTime() && isCleanupTime(context.window, timer.getTimestamp())) {
- clearAllState(context.window, windowState, mergingWindows);
+ if (!windowAssigner.isEventTime() && isCleanupTime(triggerContext.window, timer.getTimestamp())) {
+ clearAllState(triggerContext.window, windowState, mergingWindows);
}
if (mergingWindows != null) {
@@ -559,14 +572,16 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
* {@link Trigger#clear(Window, Trigger.TriggerContext)}.
*
* <p>The caller must ensure that the
- * correct key is set in the state backend and the context object.
+ * correct key is set in the state backend and the triggerContext object.
*/
private void clearAllState(
W window,
AppendingState<IN, ACC> windowState,
MergingWindowSet<W> mergingWindows) throws Exception {
windowState.clear();
- context.clear();
+ triggerContext.clear();
+ processContext.window = window;
+ processContext.clear();
if (mergingWindows != null) {
mergingWindows.retireWindow(window);
mergingWindows.persist();
@@ -579,7 +594,8 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
@SuppressWarnings("unchecked")
private void emitWindowContents(W window, ACC contents) throws Exception {
timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp());
- userFunction.apply(context.key, context.window, contents, timestampedCollector);
+ processContext.window = window;
+ userFunction.process(triggerContext.key, window, processContext, contents, timestampedCollector);
}
/**
@@ -636,9 +652,9 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
}
if (windowAssigner.isEventTime()) {
- context.registerEventTimeTimer(cleanupTime);
+ triggerContext.registerEventTimeTimer(cleanupTime);
} else {
- context.registerProcessingTimeTimer(cleanupTime);
+ triggerContext.registerProcessingTimeTimer(cleanupTime);
}
}
@@ -654,9 +670,9 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
return;
}
if (windowAssigner.isEventTime()) {
- context.deleteEventTimeTimer(cleanupTime);
+ triggerContext.deleteEventTimeTimer(cleanupTime);
} else {
- context.deleteProcessingTimeTimer(cleanupTime);
+ triggerContext.deleteProcessingTimeTimer(cleanupTime);
}
}
@@ -686,6 +702,134 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
}
/**
+ * Base class for per-window {@link KeyedStateStore KeyedStateStores}. Used to allow per-window
+ * state access for {@link org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction}.
+ */
+ public abstract class AbstractPerWindowStateStore implements KeyedStateStore {
+
+ // we have this in the base class even though it's not used in MergingKeyStore so that
+ // we can always set it and ignore what actual implementation we have
+ protected W window;
+ }
+
+ /**
+ * Special {@link AbstractPerWindowStateStore} that doesn't allow access to per-window state.
+ */
+ public class MergingWindowStateStore extends AbstractPerWindowStateStore {
+ @Override
+ public <T> ValueState<T> getState(ValueStateDescriptor<T> stateProperties) {
+ throw new UnsupportedOperationException("Per-window state is not allowed when using merging windows.");
+ }
+
+ @Override
+ public <T> ListState<T> getListState(ListStateDescriptor<T> stateProperties) {
+ throw new UnsupportedOperationException("Per-window state is not allowed when using merging windows.");
+ }
+
+ @Override
+ public <T> ReducingState<T> getReducingState(ReducingStateDescriptor<T> stateProperties) {
+ throw new UnsupportedOperationException("Per-window state is not allowed when using merging windows.");
+ }
+
+ @Override
+ public <T, A> FoldingState<T, A> getFoldingState(FoldingStateDescriptor<T, A> stateProperties) {
+ throw new UnsupportedOperationException("Per-window state is not allowed when using merging windows.");
+ }
+
+ @Override
+ public <UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> stateProperties) {
+ throw new UnsupportedOperationException("Per-window state is not allowed when using merging windows.");
+ }
+ }
+
+ /**
+ * Regular per-window state store for use with
+ * {@link org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction}.
+ */
+ public class PerWindowStateStore extends AbstractPerWindowStateStore {
+ @Override
+ public <T> ValueState<T> getState(ValueStateDescriptor<T> stateProperties) {
+ try {
+ return WindowOperator.this.getPartitionedState(window, windowSerializer, stateProperties);
+ } catch (Exception e) {
+ throw new RuntimeException("Could not retrieve state", e);
+ }
+ }
+
+ @Override
+ public <T> ListState<T> getListState(ListStateDescriptor<T> stateProperties) {
+ try {
+ return WindowOperator.this.getPartitionedState(window, windowSerializer, stateProperties);
+ } catch (Exception e) {
+ throw new RuntimeException("Could not retrieve state", e);
+ }
+ }
+
+ @Override
+ public <T> ReducingState<T> getReducingState(ReducingStateDescriptor<T> stateProperties) {
+ try {
+ return WindowOperator.this.getPartitionedState(window, windowSerializer, stateProperties);
+ } catch (Exception e) {
+ throw new RuntimeException("Could not retrieve state", e);
+ }
+ }
+
+ @Override
+ public <T, ACC> FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC> stateProperties) {
+ try {
+ return WindowOperator.this.getPartitionedState(window, windowSerializer, stateProperties);
+ } catch (Exception e) {
+ throw new RuntimeException("Could not retrieve state", e);
+ }
+ }
+
+ @Override
+ public <UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> stateProperties) {
+ try {
+ return WindowOperator.this.getPartitionedState(window, windowSerializer, stateProperties);
+ } catch (Exception e) {
+ throw new RuntimeException("Could not retrieve state", e);
+ }
+ }
+ }
+
+ /**
+ * A utility class for handling {@code ProcessWindowFunction} invocations. This can be reused
+ * by setting the {@code key} and {@code window} fields. No internal state must be kept in the
+ * {@code WindowContext}.
+ */
+ public class WindowContext implements InternalWindowFunction.InternalWindowContext {
+ protected W window;
+
+ protected AbstractPerWindowStateStore windowState;
+
+ public WindowContext(W window) {
+ this.window = window;
+ this.windowState = windowAssigner instanceof MergingWindowAssigner ? new MergingWindowStateStore() : new PerWindowStateStore();
+ }
+
+ @Override
+ public String toString() {
+ return "WindowContext{Window = " + window.toString() + "}";
+ }
+
+ public void clear() throws Exception {
+ userFunction.clear(window, this);
+ }
+
+ @Override
+ public KeyedStateStore windowState() {
+ this.windowState.window = this.window;
+ return this.windowState;
+ }
+
+ @Override
+ public KeyedStateStore globalState() {
+ return WindowOperator.this.getKeyedStateStore();
+ }
+ }
+
+ /**
* {@code Context} is a utility for handling {@code Trigger} invocations. It can be reused
* by setting the {@code key} and {@code window} fields. No internal state must be kept in
* the {@code Context}
http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessAllWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessAllWindowFunction.java
index 9533c95..83e896d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessAllWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessAllWindowFunction.java
@@ -21,6 +21,7 @@ import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.IterationRuntimeContext;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.java.operators.translation.WrappingFunction;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.util.Collector;
@@ -45,6 +46,8 @@ public final class InternalAggregateProcessAllWindowFunction<T, ACC, V, R, W ext
private final AggregateFunction<T, ACC, V> aggFunction;
+ private transient InternalProcessAllWindowContext<V, R, W> ctx;
+
public InternalAggregateProcessAllWindowFunction(
AggregateFunction<T, ACC, V> aggFunction,
ProcessAllWindowFunction<V, R, W> windowFunction) {
@@ -53,22 +56,31 @@ public final class InternalAggregateProcessAllWindowFunction<T, ACC, V, R, W ext
}
@Override
- public void apply(Byte key, final W window, Iterable<T> input, Collector<R> out) throws Exception {
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
ProcessAllWindowFunction<V, R, W> wrappedFunction = this.wrappedFunction;
- ProcessAllWindowFunction<V, R, W>.Context context = wrappedFunction.new Context() {
- @Override
- public W window() {
- return window;
- }
- };
+ this.ctx = new InternalProcessAllWindowContext<>(wrappedFunction);
+ }
+ @Override
+ public void process(Byte key, final W window, final InternalWindowContext context, Iterable<T> input, Collector<R> out) throws Exception {
final ACC acc = aggFunction.createAccumulator();
for (T val : input) {
aggFunction.add(val, acc);
}
- wrappedFunction.process(context, Collections.singletonList(aggFunction.getResult(acc)), out);
+ this.ctx.window = window;
+ this.ctx.internalContext = context;
+ ProcessAllWindowFunction<V, R, W> wrappedFunction = this.wrappedFunction;
+ wrappedFunction.process(ctx, Collections.singletonList(aggFunction.getResult(acc)), out);
+ }
+
+ @Override
+ public void clear(final W window, final InternalWindowContext context) throws Exception {
+ this.ctx.window = window;
+ this.ctx.internalContext = context;
+ wrappedFunction.clear(ctx);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessWindowFunction.java
index 433da9b..e14c9bd 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessWindowFunction.java
@@ -46,30 +46,36 @@ public final class InternalAggregateProcessWindowFunction<T, ACC, V, R, K, W ext
private final AggregateFunction<T, ACC, V> aggFunction;
+ private final InternalProcessWindowContext<V, R, K, W> ctx;
+
public InternalAggregateProcessWindowFunction(
AggregateFunction<T, ACC, V> aggFunction,
ProcessWindowFunction<V, R, K, W> windowFunction) {
super(windowFunction);
this.aggFunction = aggFunction;
+ this.ctx = new InternalProcessWindowContext<>(windowFunction);
}
-
- @Override
- public void apply(K key, final W window, Iterable<T> input, Collector<R> out) throws Exception {
- ProcessWindowFunction<V, R, K, W> wrappedFunction = this.wrappedFunction;
- ProcessWindowFunction<V, R, K, W>.Context context = wrappedFunction.new Context() {
- @Override
- public W window() {
- return window;
- }
- };
+ @Override
+ public void process(K key, final W window, final InternalWindowContext context, Iterable<T> input, Collector<R> out) throws Exception {
final ACC acc = aggFunction.createAccumulator();
for (T val : input) {
aggFunction.add(val, acc);
}
- wrappedFunction.process(key, context, Collections.singletonList(aggFunction.getResult(acc)), out);
+ this.ctx.window = window;
+ this.ctx.internalContext = context;
+ ProcessWindowFunction<V, R, K, W> wrappedFunction = this.wrappedFunction;
+ wrappedFunction.process(key, ctx, Collections.singletonList(aggFunction.getResult(acc)), out);
+ }
+
+ @Override
+ public void clear(final W window, final InternalWindowContext context) throws Exception {
+ this.ctx.window = window;
+ this.ctx.internalContext = context;
+ ProcessWindowFunction<V, R, K, W> wrappedFunction = this.wrappedFunction;
+ wrappedFunction.clear(ctx);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableAllWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableAllWindowFunction.java
index 672bdb6..f2507ed 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableAllWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableAllWindowFunction.java
@@ -39,11 +39,16 @@ public final class InternalIterableAllWindowFunction<IN, OUT, W extends Window>
}
@Override
- public void apply(Byte key, W window, Iterable<IN> input, Collector<OUT> out) throws Exception {
+ public void process(Byte key, W window, InternalWindowContext context, Iterable<IN> input, Collector<OUT> out) throws Exception {
wrappedFunction.apply(window, input, out);
}
@Override
+ public void clear(W window, InternalWindowContext context) throws Exception {
+
+ }
+
+ @Override
public RuntimeContext getRuntimeContext() {
throw new RuntimeException("This should never be called.");
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableProcessAllWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableProcessAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableProcessAllWindowFunction.java
index e33cc2a..47b7d55 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableProcessAllWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableProcessAllWindowFunction.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.operators.windowing.functions;
import org.apache.flink.api.common.functions.IterationRuntimeContext;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.java.operators.translation.WrappingFunction;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.util.Collector;
@@ -34,21 +35,33 @@ public final class InternalIterableProcessAllWindowFunction<IN, OUT, W extends W
private static final long serialVersionUID = 1L;
+ private transient InternalProcessAllWindowContext<IN, OUT, W> ctx;
+
public InternalIterableProcessAllWindowFunction(ProcessAllWindowFunction<IN, OUT, W> wrappedFunction) {
super(wrappedFunction);
}
@Override
- public void apply(Byte key, final W window, Iterable<IN> input, Collector<OUT> out) throws Exception {
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ ProcessAllWindowFunction<IN, OUT, W> wrappedFunction = this.wrappedFunction;
+ this.ctx = new InternalProcessAllWindowContext<>(wrappedFunction);
+ }
+
+ @Override
+ public void process(Byte key, final W window, final InternalWindowContext context, Iterable<IN> input, Collector<OUT> out) throws Exception {
+ this.ctx.window = window;
+ this.ctx.internalContext = context;
+ ProcessAllWindowFunction<IN, OUT, W> wrappedFunction = this.wrappedFunction;
+ wrappedFunction.process(ctx, input, out);
+ }
+
+ @Override
+ public void clear(final W window, final InternalWindowContext context) throws Exception {
+ this.ctx.window = window;
+ this.ctx.internalContext = context;
ProcessAllWindowFunction<IN, OUT, W> wrappedFunction = this.wrappedFunction;
- ProcessAllWindowFunction<IN, OUT, W>.Context context = wrappedFunction.new Context() {
- @Override
- public W window() {
- return window;
- }
- };
-
- wrappedFunction.process(context, input, out);
+ wrappedFunction.clear(ctx);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableProcessWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableProcessWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableProcessWindowFunction.java
index de516a5..7eb015e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableProcessWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableProcessWindowFunction.java
@@ -34,21 +34,27 @@ public final class InternalIterableProcessWindowFunction<IN, OUT, KEY, W extends
private static final long serialVersionUID = 1L;
+ private final InternalProcessWindowContext<IN, OUT, KEY, W> ctx;
+
public InternalIterableProcessWindowFunction(ProcessWindowFunction<IN, OUT, KEY, W> wrappedFunction) {
super(wrappedFunction);
+ this.ctx = new InternalProcessWindowContext<>(wrappedFunction);
+ }
+
+ @Override
+ public void process(KEY key, final W window, final InternalWindowContext context, Iterable<IN> input, Collector<OUT> out) throws Exception {
+ this.ctx.window = window;
+ this.ctx.internalContext = context;
+ ProcessWindowFunction<IN, OUT, KEY, W> wrappedFunction = this.wrappedFunction;
+ wrappedFunction.process(key, ctx, input, out);
}
@Override
- public void apply(KEY key, final W window, Iterable<IN> input, Collector<OUT> out) throws Exception {
+ public void clear(final W window, final InternalWindowContext context) throws Exception {
+ this.ctx.window = window;
+ this.ctx.internalContext = context;
ProcessWindowFunction<IN, OUT, KEY, W> wrappedFunction = this.wrappedFunction;
- ProcessWindowFunction<IN, OUT, KEY, W>.Context context = wrappedFunction.new Context() {
- @Override
- public W window() {
- return window;
- }
- };
-
- wrappedFunction.process(key, context, input, out);
+ wrappedFunction.clear(ctx);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableWindowFunction.java
index 895b31f..e2f1517 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableWindowFunction.java
@@ -39,11 +39,16 @@ public final class InternalIterableWindowFunction<IN, OUT, KEY, W extends Window
}
@Override
- public void apply(KEY key, W window, Iterable<IN> input, Collector<OUT> out) throws Exception {
+ public void process(KEY key, W window, InternalWindowContext context, Iterable<IN> input, Collector<OUT> out) throws Exception {
wrappedFunction.apply(key, window, input, out);
}
@Override
+ public void clear(W window, InternalWindowContext context) throws Exception {
+
+ }
+
+ @Override
public RuntimeContext getRuntimeContext() {
throw new RuntimeException("This should never be called.");
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalProcessAllWindowContext.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalProcessAllWindowContext.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalProcessAllWindowContext.java
new file mode 100644
index 0000000..c70e161
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalProcessAllWindowContext.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing.functions;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.state.KeyedStateStore;
+import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+
+/**
+ * Internal reusable context wrapper.
+ *
+ * @param <IN> The type of the input value.
+ * @param <OUT> The type of the output value.
+ * @param <W> The type of the window.
+ */
+@Internal
+public class InternalProcessAllWindowContext<IN, OUT, W extends Window>
+ extends ProcessAllWindowFunction<IN, OUT, W>.Context {
+
+ W window;
+ InternalWindowFunction.InternalWindowContext internalContext;
+
+ InternalProcessAllWindowContext(ProcessAllWindowFunction<IN, OUT, W> function) {
+ function.super();
+ }
+
+ @Override
+ public W window() {
+ return window;
+ }
+
+ @Override
+ public KeyedStateStore windowState() {
+ return internalContext.windowState();
+ }
+
+ @Override
+ public KeyedStateStore globalState() {
+ return internalContext.globalState();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalProcessWindowContext.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalProcessWindowContext.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalProcessWindowContext.java
new file mode 100644
index 0000000..0f1c0ee
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalProcessWindowContext.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing.functions;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.state.KeyedStateStore;
+import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+
+/**
+ * Internal reusable context wrapper.
+ *
+ * @param <IN> The type of the input value.
+ * @param <OUT> The type of the output value.
+ * @param <KEY> The type of the key.
+ * @param <W> The type of the window.
+ */
+@Internal
+public class InternalProcessWindowContext<IN, OUT, KEY, W extends Window>
+ extends ProcessWindowFunction<IN, OUT, KEY, W>.Context {
+
+ W window;
+ InternalWindowFunction.InternalWindowContext internalContext;
+
+ InternalProcessWindowContext(ProcessWindowFunction<IN, OUT, KEY, W> function) {
+ function.super();
+ }
+
+ @Override
+ public W window() {
+ return window;
+ }
+
+ @Override
+ public KeyedStateStore windowState() {
+ return internalContext.windowState();
+ }
+
+ @Override
+ public KeyedStateStore globalState() {
+ return internalContext.globalState();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueAllWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueAllWindowFunction.java
index a34d3ec..e90bcf4 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueAllWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueAllWindowFunction.java
@@ -41,11 +41,16 @@ public final class InternalSingleValueAllWindowFunction<IN, OUT, W extends Windo
}
@Override
- public void apply(Byte key, W window, IN input, Collector<OUT> out) throws Exception {
+ public void process(Byte key, W window, InternalWindowContext context, IN input, Collector<OUT> out) throws Exception {
wrappedFunction.apply(window, Collections.singletonList(input), out);
}
@Override
+ public void clear(W window, InternalWindowContext context) throws Exception {
+
+ }
+
+ @Override
public RuntimeContext getRuntimeContext() {
throw new RuntimeException("This should never be called.");
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueProcessAllWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueProcessAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueProcessAllWindowFunction.java
index 0284ef7..f7c6a08 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueProcessAllWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueProcessAllWindowFunction.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.operators.windowing.functions;
import org.apache.flink.api.common.functions.IterationRuntimeContext;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.java.operators.translation.WrappingFunction;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.util.Collector;
@@ -36,21 +37,33 @@ public final class InternalSingleValueProcessAllWindowFunction<IN, OUT, W extend
private static final long serialVersionUID = 1L;
+ private transient InternalProcessAllWindowContext<IN, OUT, W> ctx;
+
public InternalSingleValueProcessAllWindowFunction(ProcessAllWindowFunction<IN, OUT, W> wrappedFunction) {
super(wrappedFunction);
}
@Override
- public void apply(Byte key, final W window, IN input, Collector<OUT> out) throws Exception {
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ ProcessAllWindowFunction<IN, OUT, W> wrappedFunction = this.wrappedFunction;
+ this.ctx = new InternalProcessAllWindowContext<>(wrappedFunction);
+ }
+
+ @Override
+ public void process(Byte key, final W window, final InternalWindowContext context, IN input, Collector<OUT> out) throws Exception {
+ this.ctx.window = window;
+ this.ctx.internalContext = context;
ProcessAllWindowFunction<IN, OUT, W> wrappedFunction = this.wrappedFunction;
- ProcessAllWindowFunction<IN, OUT, W>.Context context = wrappedFunction.new Context() {
- @Override
- public W window() {
- return window;
- }
- };
+ wrappedFunction.process(ctx, Collections.singletonList(input), out);
+ }
- wrappedFunction.process(context, Collections.singletonList(input), out);
+ @Override
+ public void clear(final W window, final InternalWindowContext context) throws Exception {
+ this.ctx.window = window;
+ this.ctx.internalContext = context;
+ ProcessAllWindowFunction<IN, OUT, W> wrappedFunction = this.wrappedFunction;
+ wrappedFunction.clear(ctx);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueProcessWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueProcessWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueProcessWindowFunction.java
index 7a4e8c6..21d1639 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueProcessWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueProcessWindowFunction.java
@@ -36,21 +36,29 @@ public final class InternalSingleValueProcessWindowFunction<IN, OUT, KEY, W exte
private static final long serialVersionUID = 1L;
+ private final InternalProcessWindowContext<IN, OUT, KEY, W> ctx;
+
public InternalSingleValueProcessWindowFunction(ProcessWindowFunction<IN, OUT, KEY, W> wrappedFunction) {
super(wrappedFunction);
+ ctx = new InternalProcessWindowContext<>(wrappedFunction);
}
@Override
- public void apply(KEY key, final W window, IN input, Collector<OUT> out) throws Exception {
+ public void process(KEY key, final W window, final InternalWindowContext context, IN input, Collector<OUT> out) throws Exception {
+ this.ctx.window = window;
+ this.ctx.internalContext = context;
+
ProcessWindowFunction<IN, OUT, KEY, W> wrappedFunction = this.wrappedFunction;
- ProcessWindowFunction<IN, OUT, KEY, W>.Context context = wrappedFunction.new Context() {
- @Override
- public W window() {
- return window;
- }
- };
+ wrappedFunction.process(key, ctx, Collections.singletonList(input), out);
+ }
- wrappedFunction.process(key, context, Collections.singletonList(input), out);
+ @Override
+ public void clear(final W window, final InternalWindowContext context) throws Exception {
+ this.ctx.window = window;
+ this.ctx.internalContext = context;
+
+ ProcessWindowFunction<IN, OUT, KEY, W> wrappedFunction = this.wrappedFunction;
+ wrappedFunction.clear(ctx);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueWindowFunction.java
index 9a0a447..d5cc4a2 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueWindowFunction.java
@@ -41,11 +41,16 @@ public final class InternalSingleValueWindowFunction<IN, OUT, KEY, W extends Win
}
@Override
- public void apply(KEY key, W window, IN input, Collector<OUT> out) throws Exception {
+ public void process(KEY key, W window, InternalWindowContext context, IN input, Collector<OUT> out) throws Exception {
wrappedFunction.apply(key, window, Collections.singletonList(input), out);
}
@Override
+ public void clear(W window, InternalWindowContext context) throws Exception {
+
+ }
+
+ @Override
public RuntimeContext getRuntimeContext() {
throw new RuntimeException("This should never be called.");
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalWindowFunction.java
index 2eb4052..9834480 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalWindowFunction.java
@@ -18,6 +18,7 @@
package org.apache.flink.streaming.runtime.operators.windowing.functions;
import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.state.KeyedStateStore;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.util.Collector;
@@ -29,15 +30,28 @@ import org.apache.flink.util.Collector;
* @param <KEY> The type of the key.
*/
public interface InternalWindowFunction<IN, OUT, KEY, W extends Window> extends Function {
-
/**
* Evaluates the window and outputs none or several elements.
*
- * @param key The key for which this window is evaluated.
- * @param window The window that is being evaluated.
- * @param input The elements in the window being evaluated.
- * @param out A collector for emitting elements.
+ * @param context The context in which the window is being evaluated.
+ * @param input The elements in the window being evaluated.
+ * @param out A collector for emitting elements.
+ *
* @throws Exception The function may throw exceptions to fail the program and trigger recovery.
*/
- void apply(KEY key, W window, IN input, Collector<OUT> out) throws Exception;
+ void process(KEY key, W window, InternalWindowContext context, IN input, Collector<OUT> out) throws Exception;
+
+ /**
+ * Deletes any state in the {@code Context} when the Window is purged.
+ *
+ * @param context The context to which the window is being evaluated
+ * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
+ */
+ void clear(W window, InternalWindowContext context) throws Exception;
+
+ interface InternalWindowContext extends java.io.Serializable {
+ KeyedStateStore windowState();
+
+ KeyedStateStore globalState();
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyProcessWindowFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyProcessWindowFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyProcessWindowFunctionTest.java
index 4b479f3..c4bed37 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyProcessWindowFunctionTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyProcessWindowFunctionTest.java
@@ -21,20 +21,28 @@ package org.apache.flink.streaming.api.operators;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.util.ListCollector;
+import org.apache.flink.api.common.state.FoldingState;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.KeyedStateStore;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingState;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeutils.base.ByteSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.api.java.Utils;
import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.FoldApplyProcessAllWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.FoldApplyProcessWindowFunction;
-import org.apache.flink.streaming.api.functions.windowing.FoldApplyWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
-import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
@@ -45,8 +53,8 @@ import org.apache.flink.streaming.runtime.operators.windowing.AccumulatingProces
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessAllWindowFunction;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction;
import org.apache.flink.util.Collector;
-import org.junit.Test;
import org.junit.Assert;
+import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
@@ -139,12 +147,26 @@ public class FoldApplyProcessWindowFunctionTest {
expected.add(initValue);
- foldWindowFunction.process(0, foldWindowFunction.new Context() {
+ FoldApplyProcessWindowFunction<Integer, TimeWindow, Integer, Integer, Integer>.Context ctx = foldWindowFunction.new Context() {
@Override
public TimeWindow window() {
return new TimeWindow(0, 1);
}
- }, input, new ListCollector<>(result));
+
+ @Override
+ public KeyedStateStore windowState() {
+ return new DummyKeyedStateStore();
+ }
+
+ @Override
+ public KeyedStateStore globalState() {
+ return new DummyKeyedStateStore();
+ }
+ };
+
+ foldWindowFunction.open(new Configuration());
+
+ foldWindowFunction.process(0, ctx, input, new ListCollector<>(result));
Assert.assertEquals(expected, result);
}
@@ -234,16 +256,58 @@ public class FoldApplyProcessWindowFunctionTest {
expected.add(initValue);
- foldWindowFunction.process(foldWindowFunction.new Context() {
+ FoldApplyProcessAllWindowFunction<TimeWindow, Integer, Integer, Integer>.Context ctx = foldWindowFunction.new Context() {
@Override
public TimeWindow window() {
return new TimeWindow(0, 1);
}
- }, input, new ListCollector<>(result));
+
+ @Override
+ public KeyedStateStore windowState() {
+ return new DummyKeyedStateStore();
+ }
+
+ @Override
+ public KeyedStateStore globalState() {
+ return new DummyKeyedStateStore();
+ }
+ };
+
+ foldWindowFunction.open(new Configuration());
+
+ foldWindowFunction.process(ctx, input, new ListCollector<>(result));
Assert.assertEquals(expected, result);
}
+ public static class DummyKeyedStateStore implements KeyedStateStore {
+
+ @Override
+ public <T> ValueState<T> getState(ValueStateDescriptor<T> stateProperties) {
+ return null;
+ }
+
+ @Override
+ public <T> ListState<T> getListState(ListStateDescriptor<T> stateProperties) {
+ return null;
+ }
+
+ @Override
+ public <T> ReducingState<T> getReducingState(ReducingStateDescriptor<T> stateProperties) {
+ return null;
+ }
+
+ @Override
+ public <T, ACC> FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC> stateProperties) {
+ return null;
+ }
+
+ @Override
+ public <UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> stateProperties) {
+ return null;
+ }
+ }
+
public static class DummyStreamExecutionEnvironment extends StreamExecutionEnvironment {
public DummyStreamExecutionEnvironment() {
[3/3] flink git commit: [hotfix] Make GC test more strict in
WindowOperatorContractTest
Posted by al...@apache.org.
[hotfix] Make GC test more strict in WindowOperatorContractTest
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/662ed33d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/662ed33d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/662ed33d
Branch: refs/heads/master
Commit: 662ed33d8f5baed95035b8176daf95a1caa0b278
Parents: fad201b
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Sat Mar 25 16:59:31 2017 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Sat Mar 25 16:59:31 2017 +0100
----------------------------------------------------------------------
.../windowing/WindowOperatorContractTest.java | 21 ++++++++++++++++++--
1 file changed, 19 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/662ed33d/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
index faab505..3ae8f37 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
@@ -1853,13 +1853,20 @@ public abstract class WindowOperatorContractTest extends TestLogger {
}
private void testGarbageCollectionTimer(TimeDomainAdaptor timeAdaptor) throws Exception {
+ long allowedLateness = 20L;
+
+ if (timeAdaptor instanceof ProcessingTimeAdaptor) {
+ // we don't have allowed lateness for processing time
+ allowedLateness = 0;
+ }
+
WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner();
timeAdaptor.setIsEventTime(mockAssigner);
Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness =
- createWindowOperator(mockAssigner, mockTrigger, 20L, mockWindowFunction);
+ createWindowOperator(mockAssigner, mockTrigger, allowedLateness, mockWindowFunction);
testHarness.open();
@@ -1879,7 +1886,17 @@ public abstract class WindowOperatorContractTest extends TestLogger {
verify(mockTrigger, never()).clear(anyTimeWindow(), anyTriggerContext());
- timeAdaptor.advanceTime(testHarness, 19 + 20); // 19 is maxTime of the window
+ // verify that we can still fire on the GC timer
+ timeAdaptor.shouldFireOnTime(mockTrigger);
+
+ timeAdaptor.advanceTime(testHarness, 19 + allowedLateness); // 19 is maxTime of the window
+
+ // ensure that our trigger is still called
+ timeAdaptor.verifyTriggerCallback(mockTrigger, times(1), 19L + allowedLateness, null);
+
+ // ensure that our window function is called a last timer if the trigger
+ // fires on the GC timer
+ verify(mockWindowFunction, times(1)).process(eq(0), eq(new TimeWindow(0, 20)), anyInternalWindowContext(), intIterable(0), WindowOperatorContractTest.<Void>anyCollector());
verify(mockTrigger, times(1)).clear(anyTimeWindow(), anyTriggerContext());