You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/03/30 22:04:38 UTC
[27/50] [abbrv] flink git commit: [FLINK-5929] Allow Access to
Per-Window State in ProcessWindowFunction
http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java
index d4fefa2..6f34607 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/windowing/functions/InternalWindowFunctionTest.java
@@ -21,9 +21,11 @@ package org.apache.flink.streaming.api.operators.windowing.functions;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.KeyedStateStore;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.util.functions.StreamingFunctionUtils;
import org.apache.flink.streaming.api.functions.windowing.RichAllWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.RichProcessAllWindowFunction;
@@ -31,6 +33,7 @@ import org.apache.flink.streaming.api.functions.windowing.RichProcessWindowFunct
import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalAggregateProcessAllWindowFunction;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalAggregateProcessWindowFunction;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableAllWindowFunction;
@@ -41,6 +44,7 @@ import org.apache.flink.streaming.runtime.operators.windowing.functions.Internal
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueProcessAllWindowFunction;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueProcessWindowFunction;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction;
+import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction;
import org.apache.flink.util.Collector;
import org.hamcrest.collection.IsIterableContainingInOrder;
import org.junit.Test;
@@ -56,7 +60,11 @@ import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.collection.IsMapContaining.hasEntry;
import static org.hamcrest.core.AllOf.allOf;
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.anyObject;
+import static org.mockito.Mockito.argThat;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
public class InternalWindowFunctionTest {
@@ -93,7 +101,9 @@ public class InternalWindowFunctionTest {
Iterable<Long> i = (Iterable<Long>)mock(Iterable.class);
Collector<String> c = (Collector<String>) mock(Collector.class);
- windowFunction.apply(((byte)0), w, i, c);
+ InternalWindowFunction.InternalWindowContext ctx = mock(InternalWindowFunction.InternalWindowContext.class);
+
+ windowFunction.process(((byte)0), w, ctx, i, c);
verify(mock).apply(w, i, c);
// check close
@@ -134,7 +144,8 @@ public class InternalWindowFunctionTest {
Iterable<Long> i = (Iterable<Long>)mock(Iterable.class);
Collector<String> c = (Collector<String>) mock(Collector.class);
- windowFunction.apply(((byte)0), w, i, c);
+ InternalWindowFunction.InternalWindowContext ctx = mock(InternalWindowFunction.InternalWindowContext.class);
+ windowFunction.process(((byte)0), w, ctx, i, c);
verify(mock).process((ProcessAllWindowFunctionMock.Context) anyObject(), eq(i), eq(c));
// check close
@@ -175,7 +186,8 @@ public class InternalWindowFunctionTest {
Iterable<Long> i = (Iterable<Long>)mock(Iterable.class);
Collector<String> c = (Collector<String>) mock(Collector.class);
- windowFunction.apply(42L, w, i, c);
+ InternalWindowFunction.InternalWindowContext ctx = mock(InternalWindowFunction.InternalWindowContext.class);
+ windowFunction.process(42L, w, ctx, i, c);
verify(mock).apply(eq(42L), eq(w), eq(i), eq(c));
// check close
@@ -215,8 +227,9 @@ public class InternalWindowFunctionTest {
TimeWindow w = mock(TimeWindow.class);
Iterable<Long> i = (Iterable<Long>)mock(Iterable.class);
Collector<String> c = (Collector<String>) mock(Collector.class);
+ InternalWindowFunction.InternalWindowContext ctx = mock(InternalWindowFunction.InternalWindowContext.class);
- windowFunction.apply(42L, w, i, c);
+ windowFunction.process(42L, w, ctx, i, c);
verify(mock).process(eq(42L), (ProcessWindowFunctionMock.Context) anyObject(), eq(i), eq(c));
// check close
@@ -256,8 +269,9 @@ public class InternalWindowFunctionTest {
// check apply
TimeWindow w = mock(TimeWindow.class);
Collector<String> c = (Collector<String>) mock(Collector.class);
+ InternalWindowFunction.InternalWindowContext ctx = mock(InternalWindowFunction.InternalWindowContext.class);
- windowFunction.apply(42L, w, 23L, c);
+ windowFunction.process(42L, w, ctx, 23L, c);
verify(mock).apply(eq(42L), eq(w), (Iterable<Long>)argThat(IsIterableContainingInOrder.contains(23L)), eq(c));
// check close
@@ -297,8 +311,9 @@ public class InternalWindowFunctionTest {
// check apply
TimeWindow w = mock(TimeWindow.class);
Collector<String> c = (Collector<String>) mock(Collector.class);
+ InternalWindowFunction.InternalWindowContext ctx = mock(InternalWindowFunction.InternalWindowContext.class);
- windowFunction.apply(((byte)0), w, 23L, c);
+ windowFunction.process(((byte)0), w, ctx, 23L, c);
verify(mock).apply(eq(w), (Iterable<Long>)argThat(IsIterableContainingInOrder.contains(23L)), eq(c));
// check close
@@ -338,8 +353,9 @@ public class InternalWindowFunctionTest {
// check apply
TimeWindow w = mock(TimeWindow.class);
Collector<String> c = (Collector<String>) mock(Collector.class);
+ InternalWindowFunction.InternalWindowContext ctx = mock(InternalWindowFunction.InternalWindowContext.class);
- windowFunction.apply(((byte)0), w, 23L, c);
+ windowFunction.process(((byte)0), w, ctx, 23L, c);
verify(mock).process((ProcessAllWindowFunctionMock.Context) anyObject(), (Iterable<Long>)argThat(IsIterableContainingInOrder.contains(23L)), eq(c));
// check close
@@ -378,8 +394,9 @@ public class InternalWindowFunctionTest {
// check apply
TimeWindow w = mock(TimeWindow.class);
Collector<String> c = (Collector<String>) mock(Collector.class);
+ InternalWindowFunction.InternalWindowContext ctx = mock(InternalWindowFunction.InternalWindowContext.class);
- windowFunction.apply(42L, w, 23L, c);
+ windowFunction.process(42L, w, ctx,23L, c);
verify(mock).process(eq(42L), (ProcessWindowFunctionMock.Context) anyObject(), (Iterable<Long>)argThat(IsIterableContainingInOrder.contains(23L)), eq(c));
// check close
@@ -450,8 +467,9 @@ public class InternalWindowFunctionTest {
List<Long> args = new LinkedList<>();
args.add(23L);
args.add(24L);
+ InternalWindowFunction.InternalWindowContext ctx = mock(InternalWindowFunction.InternalWindowContext.class);
- windowFunction.apply(42L, w, args, c);
+ windowFunction.process(42L, w, ctx, args, c);
verify(mock).process(
eq(42L),
(AggregateProcessWindowFunctionMock.Context) anyObject(),
@@ -528,8 +546,9 @@ public class InternalWindowFunctionTest {
List<Long> args = new LinkedList<>();
args.add(23L);
args.add(24L);
+ InternalWindowFunction.InternalWindowContext ctx = mock(InternalWindowFunction.InternalWindowContext.class);
- windowFunction.apply(((byte)0), w, args, c);
+ windowFunction.process(((byte)0), w, ctx, args, c);
verify(mock).process(
(AggregateProcessAllWindowFunctionMock.Context) anyObject(),
(Iterable) argThat(containsInAnyOrder(allOf(
@@ -552,7 +571,9 @@ public class InternalWindowFunctionTest {
public void setOutputType(TypeInformation<String> outTypeInfo, ExecutionConfig executionConfig) { }
@Override
- public void process(Long aLong, Context context, Iterable<Long> input, Collector<String> out) throws Exception { }
+ public void process(Long aLong, ProcessWindowFunction<Long, String, Long, TimeWindow>.Context context, Iterable<Long> elements, Collector<String> out) throws Exception {
+
+ }
}
public static class AggregateProcessWindowFunctionMock
@@ -565,7 +586,9 @@ public class InternalWindowFunctionTest {
public void setOutputType(TypeInformation<String> outTypeInfo, ExecutionConfig executionConfig) { }
@Override
- public void process(Long aLong, Context context, Iterable<Map<Long, Long>> input, Collector<String> out) throws Exception { }
+ public void process(Long aLong, ProcessWindowFunction<Map<Long, Long>, String, Long, TimeWindow>.Context context, Iterable<Map<Long, Long>> elements, Collector<String> out) throws Exception {
+
+ }
}
public static class AggregateProcessAllWindowFunctionMock
http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/RegularWindowOperatorContractTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/RegularWindowOperatorContractTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/RegularWindowOperatorContractTest.java
index 11508c5..ff1cbdf 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/RegularWindowOperatorContractTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/RegularWindowOperatorContractTest.java
@@ -118,9 +118,9 @@ public class RegularWindowOperatorContractTest extends WindowOperatorContractTes
testHarness.processElement(new StreamRecord<>(1, 0L));
- verify(mockWindowFunction, times(2)).apply(eq(1), anyTimeWindow(), anyInt(), WindowOperatorContractTest.<Void>anyCollector());
- verify(mockWindowFunction, times(1)).apply(eq(1), eq(new TimeWindow(0, 2)), eq(3), WindowOperatorContractTest.<Void>anyCollector());
- verify(mockWindowFunction, times(1)).apply(eq(1), eq(new TimeWindow(2, 4)), eq(3), WindowOperatorContractTest.<Void>anyCollector());
+ verify(mockWindowFunction, times(2)).process(eq(1), anyTimeWindow(), anyInternalWindowContext(), anyInt(), WindowOperatorContractTest.<Void>anyCollector());
+ verify(mockWindowFunction, times(1)).process(eq(1), eq(new TimeWindow(0, 2)), anyInternalWindowContext(), eq(3), WindowOperatorContractTest.<Void>anyCollector());
+ verify(mockWindowFunction, times(1)).process(eq(1), eq(new TimeWindow(2, 4)), anyInternalWindowContext(), eq(3), WindowOperatorContractTest.<Void>anyCollector());
// clear is only called at cleanup time/GC time
verify(mockTrigger, never()).clear(anyTimeWindow(), anyTriggerContext());
@@ -182,9 +182,9 @@ public class RegularWindowOperatorContractTest extends WindowOperatorContractTes
testHarness.processElement(new StreamRecord<>(1, 0L));
- verify(mockWindowFunction, times(2)).apply(eq(1), anyTimeWindow(), anyInt(), WindowOperatorContractTest.<Void>anyCollector());
- verify(mockWindowFunction, times(1)).apply(eq(1), eq(new TimeWindow(0, 2)), eq(3), WindowOperatorContractTest.<Void>anyCollector());
- verify(mockWindowFunction, times(1)).apply(eq(1), eq(new TimeWindow(2, 4)), eq(3), WindowOperatorContractTest.<Void>anyCollector());
+ verify(mockWindowFunction, times(2)).process(eq(1), anyTimeWindow(), anyInternalWindowContext(), anyInt(), WindowOperatorContractTest.<Void>anyCollector());
+ verify(mockWindowFunction, times(1)).process(eq(1), eq(new TimeWindow(0, 2)), anyInternalWindowContext(), eq(3), WindowOperatorContractTest.<Void>anyCollector());
+ verify(mockWindowFunction, times(1)).process(eq(1), eq(new TimeWindow(2, 4)), anyInternalWindowContext(), eq(3), WindowOperatorContractTest.<Void>anyCollector());
// clear is only called at cleanup time/GC time
verify(mockTrigger, never()).clear(anyTimeWindow(), anyTriggerContext());
http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
index 8aae46a..faab505 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
@@ -163,6 +163,10 @@ public abstract class WindowOperatorContractTest extends TestLogger {
return Mockito.any();
}
+ static InternalWindowFunction.InternalWindowContext anyInternalWindowContext() {
+ return Mockito.any();
+ }
+
static Trigger.OnMergeContext anyOnMergeContext() {
return Mockito.any();
}
@@ -408,9 +412,9 @@ public abstract class WindowOperatorContractTest extends TestLogger {
testHarness.processElement(new StreamRecord<>(0, 0L));
- verify(mockWindowFunction, times(2)).apply(eq(0), anyTimeWindow(), anyIntIterable(), WindowOperatorContractTest.<Void>anyCollector());
- verify(mockWindowFunction, times(1)).apply(eq(0), eq(new TimeWindow(0, 2)), intIterable(0), WindowOperatorContractTest.<Void>anyCollector());
- verify(mockWindowFunction, times(1)).apply(eq(0), eq(new TimeWindow(2, 4)), intIterable(0), WindowOperatorContractTest.<Void>anyCollector());
+ verify(mockWindowFunction, times(2)).process(eq(0), anyTimeWindow(), anyInternalWindowContext(), anyIntIterable(), WindowOperatorContractTest.<Void>anyCollector());
+ verify(mockWindowFunction, times(1)).process(eq(0), eq((new TimeWindow(0, 2))), anyInternalWindowContext(), intIterable(0), WindowOperatorContractTest.<Void>anyCollector());
+ verify(mockWindowFunction, times(1)).process(eq(0), eq(new TimeWindow(2, 4)), anyInternalWindowContext(), intIterable(0), WindowOperatorContractTest.<Void>anyCollector());
}
@Test
@@ -455,9 +459,9 @@ public abstract class WindowOperatorContractTest extends TestLogger {
testHarness.processElement(new StreamRecord<>(0, 0L));
- verify(mockWindowFunction, times(2)).apply(anyInt(), anyTimeWindow(), anyIntIterable(), WindowOperatorContractTest.<Void>anyCollector());
- verify(mockWindowFunction, times(1)).apply(eq(0), eq(new TimeWindow(0, 2)), intIterable(0, 0), WindowOperatorContractTest.<Void>anyCollector());
- verify(mockWindowFunction, times(1)).apply(eq(1), eq(new TimeWindow(0, 1)), intIterable(1, 1), WindowOperatorContractTest.<Void>anyCollector());
+ verify(mockWindowFunction, times(2)).process(anyInt(), anyTimeWindow(), anyInternalWindowContext(), anyIntIterable(), WindowOperatorContractTest.<Void>anyCollector());
+ verify(mockWindowFunction, times(1)).process(eq(0), eq(new TimeWindow(0, 2)), anyInternalWindowContext(), intIterable(0, 0), WindowOperatorContractTest.<Void>anyCollector());
+ verify(mockWindowFunction, times(1)).process(eq(1), eq(new TimeWindow(0, 1)), anyInternalWindowContext(), intIterable(1, 1), WindowOperatorContractTest.<Void>anyCollector());
}
@Test
@@ -509,16 +513,16 @@ public abstract class WindowOperatorContractTest extends TestLogger {
@Override
public Void answer(InvocationOnMock invocation) throws Exception {
@SuppressWarnings("unchecked")
- Collector<String> out = invocation.getArgumentAt(3, Collector.class);
+ Collector<String> out = invocation.getArgumentAt(4, Collector.class);
out.collect("Hallo");
out.collect("Ciao");
return null;
}
- }).when(mockWindowFunction).apply(eq(0), eq(new TimeWindow(0, 2)), intIterable(0), WindowOperatorContractTest.<String>anyCollector());
+ }).when(mockWindowFunction).process(eq(0), eq(new TimeWindow(0, 2)), anyInternalWindowContext(), intIterable(0), WindowOperatorContractTest.<String>anyCollector());
testHarness.processElement(new StreamRecord<>(0, 0L));
- verify(mockWindowFunction, times(1)).apply(eq(0), eq(new TimeWindow(0, 2)), intIterable(0), WindowOperatorContractTest.<String>anyCollector());
+ verify(mockWindowFunction, times(1)).process(eq(0), eq(new TimeWindow(0, 2)), anyInternalWindowContext(), intIterable(0), WindowOperatorContractTest.<String>anyCollector());
assertThat(testHarness.extractOutputStreamRecords(),
contains(isStreamRecord("Hallo", 1L), isStreamRecord("Ciao", 1L)));
@@ -553,25 +557,25 @@ public abstract class WindowOperatorContractTest extends TestLogger {
@Override
public Void answer(InvocationOnMock invocation) throws Exception {
@SuppressWarnings("unchecked")
- Collector<String> out = invocation.getArgumentAt(3, Collector.class);
+ Collector<String> out = invocation.getArgumentAt(4, Collector.class);
out.collect("Hallo");
out.collect("Ciao");
return null;
}
- }).when(mockWindowFunction).apply(eq(0), eq(new TimeWindow(0, 2)), intIterable(0), WindowOperatorContractTest.<String>anyCollector());
+ }).when(mockWindowFunction).process(eq(0), eq(new TimeWindow(0, 2)), anyInternalWindowContext(), intIterable(0), WindowOperatorContractTest.<String>anyCollector());
timeAdaptor.shouldRegisterTimerOnElement(mockTrigger, 1);
testHarness.processElement(new StreamRecord<>(0, 0L));
- verify(mockWindowFunction, never()).apply(anyInt(), anyTimeWindow(), anyIntIterable(), WindowOperatorContractTest.<String>anyCollector());
+ verify(mockWindowFunction, never()).process(anyInt(), anyTimeWindow(), anyInternalWindowContext(), anyIntIterable(), WindowOperatorContractTest.<String>anyCollector());
assertTrue(testHarness.extractOutputStreamRecords().isEmpty());
timeAdaptor.shouldFireOnTime(mockTrigger);
timeAdaptor.advanceTime(testHarness, 1L);
- verify(mockWindowFunction, times(1)).apply(eq(0), eq(new TimeWindow(0, 2)), intIterable(0), WindowOperatorContractTest.<String>anyCollector());
+ verify(mockWindowFunction, times(1)).process(eq(0), eq(new TimeWindow(0, 2)), anyInternalWindowContext(), intIterable(0), WindowOperatorContractTest.<String>anyCollector());
assertThat(testHarness.extractOutputStreamRecords(),
contains(isStreamRecord("Hallo", 1L), isStreamRecord("Ciao", 1L)));
@@ -650,9 +654,9 @@ public abstract class WindowOperatorContractTest extends TestLogger {
testHarness.processElement(new StreamRecord<>(0, 0L));
- verify(mockWindowFunction, times(2)).apply(eq(0), anyTimeWindow(), anyIntIterable(), WindowOperatorContractTest.<Void>anyCollector());
- verify(mockWindowFunction, times(1)).apply(eq(0), eq(new TimeWindow(0, 2)), intIterable(0), WindowOperatorContractTest.<Void>anyCollector());
- verify(mockWindowFunction, times(1)).apply(eq(0), eq(new TimeWindow(2, 4)), intIterable(0), WindowOperatorContractTest.<Void>anyCollector());
+ verify(mockWindowFunction, times(2)).process(eq(0), anyTimeWindow(), anyInternalWindowContext(), anyIntIterable(), WindowOperatorContractTest.<Void>anyCollector());
+ verify(mockWindowFunction, times(1)).process(eq(0), eq(new TimeWindow(0, 2)), anyInternalWindowContext(), intIterable(0), WindowOperatorContractTest.<Void>anyCollector());
+ verify(mockWindowFunction, times(1)).process(eq(0), eq(new TimeWindow(2, 4)), anyInternalWindowContext(), intIterable(0), WindowOperatorContractTest.<Void>anyCollector());
// clear is only called at cleanup time/GC time
verify(mockTrigger, never()).clear(anyTimeWindow(), anyTriggerContext());
@@ -693,9 +697,9 @@ public abstract class WindowOperatorContractTest extends TestLogger {
testHarness.processElement(new StreamRecord<>(0, 0L));
- verify(mockWindowFunction, times(2)).apply(eq(0), anyTimeWindow(), anyIntIterable(), WindowOperatorContractTest.<Void>anyCollector());
- verify(mockWindowFunction, times(1)).apply(eq(0), eq(new TimeWindow(0, 2)), intIterable(0), WindowOperatorContractTest.<Void>anyCollector());
- verify(mockWindowFunction, times(1)).apply(eq(0), eq(new TimeWindow(2, 4)), intIterable(0), WindowOperatorContractTest.<Void>anyCollector());
+ verify(mockWindowFunction, times(2)).process(eq(0), anyTimeWindow(), anyInternalWindowContext(), anyIntIterable(), WindowOperatorContractTest.<Void>anyCollector());
+ verify(mockWindowFunction, times(1)).process(eq(0), eq(new TimeWindow(0, 2)), anyInternalWindowContext(), intIterable(0), WindowOperatorContractTest.<Void>anyCollector());
+ verify(mockWindowFunction, times(1)).process(eq(0), eq(new TimeWindow(2, 4)), anyInternalWindowContext(), intIterable(0), WindowOperatorContractTest.<Void>anyCollector());
// clear is only called at cleanup time/GC time
verify(mockTrigger, never()).clear(anyTimeWindow(), anyTriggerContext());
@@ -858,9 +862,9 @@ public abstract class WindowOperatorContractTest extends TestLogger {
timeAdaptor.advanceTime(testHarness, 0L);
- verify(mockWindowFunction, times(2)).apply(eq(0), anyTimeWindow(), anyIntIterable(), WindowOperatorContractTest.<Void>anyCollector());
- verify(mockWindowFunction, times(1)).apply(eq(0), eq(new TimeWindow(0, 2)), intIterable(0), WindowOperatorContractTest.<Void>anyCollector());
- verify(mockWindowFunction, times(1)).apply(eq(0), eq(new TimeWindow(2, 4)), intIterable(0), WindowOperatorContractTest.<Void>anyCollector());
+ verify(mockWindowFunction, times(2)).process(eq(0), anyTimeWindow(), anyInternalWindowContext(), anyIntIterable(), WindowOperatorContractTest.<Void>anyCollector());
+ verify(mockWindowFunction, times(1)).process(eq(0), eq(new TimeWindow(0, 2)), anyInternalWindowContext(), intIterable(0), WindowOperatorContractTest.<Void>anyCollector());
+ verify(mockWindowFunction, times(1)).process(eq(0), eq(new TimeWindow(2, 4)), anyInternalWindowContext(), intIterable(0), WindowOperatorContractTest.<Void>anyCollector());
// clear is only called at cleanup time/GC time
verify(mockTrigger, never()).clear(anyTimeWindow(), anyTriggerContext());
@@ -919,9 +923,9 @@ public abstract class WindowOperatorContractTest extends TestLogger {
timeAdaptor.advanceTime(testHarness, 0L);
- verify(mockWindowFunction, times(2)).apply(eq(0), anyTimeWindow(), anyIntIterable(), WindowOperatorContractTest.<Void>anyCollector());
- verify(mockWindowFunction, times(1)).apply(eq(0), eq(new TimeWindow(0, 2)), intIterable(0), WindowOperatorContractTest.<Void>anyCollector());
- verify(mockWindowFunction, times(1)).apply(eq(0), eq(new TimeWindow(2, 4)), intIterable(0), WindowOperatorContractTest.<Void>anyCollector());
+ verify(mockWindowFunction, times(2)).process(eq(0), anyTimeWindow(), anyInternalWindowContext(), anyIntIterable(), WindowOperatorContractTest.<Void>anyCollector());
+ verify(mockWindowFunction, times(1)).process(eq(0), eq(new TimeWindow(0, 2)), anyInternalWindowContext(), intIterable(0), WindowOperatorContractTest.<Void>anyCollector());
+ verify(mockWindowFunction, times(1)).process(eq(0), eq(new TimeWindow(2, 4)), anyInternalWindowContext(), intIterable(0), WindowOperatorContractTest.<Void>anyCollector());
// clear is only called at cleanup time/GC time
verify(mockTrigger, never()).clear(anyTimeWindow(), anyTriggerContext());
@@ -1050,7 +1054,7 @@ public abstract class WindowOperatorContractTest extends TestLogger {
timeAdaptor.verifyTriggerCallback(mockTrigger, never(), null, null);
verify(mockWindowFunction, never())
- .apply(anyInt(), anyTimeWindow(), anyIntIterable(), WindowOperatorContractTest.<List<Integer>>anyCollector());
+ .process(anyInt(), anyTimeWindow(), anyInternalWindowContext(), anyIntIterable(), WindowOperatorContractTest.<List<Integer>>anyCollector());
assertEquals(1, timeAdaptor.numTimers(testHarness)); // only gc timers left
}
@@ -1114,7 +1118,7 @@ public abstract class WindowOperatorContractTest extends TestLogger {
timeAdaptor.verifyTriggerCallback(mockTrigger, never(), null, null);
verify(mockWindowFunction, never())
- .apply(anyInt(), anyTimeWindow(), anyIntIterable(), WindowOperatorContractTest.<List<Integer>>anyCollector());
+ .process(anyInt(), anyTimeWindow(), anyInternalWindowContext(), anyIntIterable(), WindowOperatorContractTest.<List<Integer>>anyCollector());
assertEquals(1, timeAdaptor.numTimers(testHarness)); // only gc timers left
}
@@ -1186,7 +1190,7 @@ public abstract class WindowOperatorContractTest extends TestLogger {
timeAdaptor.verifyTriggerCallback(mockTrigger, times(1), null, null);
verify(mockWindowFunction, never())
- .apply(anyInt(), anyTimeWindow(), anyIntIterable(), WindowOperatorContractTest.<List<Integer>>anyCollector());
+ .process(anyInt(), anyTimeWindow(), anyInternalWindowContext(), anyIntIterable(), WindowOperatorContractTest.<List<Integer>>anyCollector());
// now we trigger the dangling timer
timeAdaptor.advanceTime(testHarness, 10L);
@@ -2208,7 +2212,7 @@ public abstract class WindowOperatorContractTest extends TestLogger {
testHarness.processElement(new StreamRecord<>(0, 0L));
- verify(mockWindowFunction, times(1)).apply(eq(0), eq(new TimeWindow(0, 2)), intIterable(0), WindowOperatorContractTest.<Void>anyCollector());
+ verify(mockWindowFunction, times(1)).process(eq(0), eq(new TimeWindow(0, 2)), anyInternalWindowContext(), intIterable(0), WindowOperatorContractTest.<Void>anyCollector());
// clear is only called at cleanup time/GC time
verify(mockTrigger, never()).clear(anyTimeWindow(), anyTriggerContext());
@@ -2326,9 +2330,9 @@ public abstract class WindowOperatorContractTest extends TestLogger {
verify(mockTrigger, times(2)).clear(anyTimeWindow(), anyTriggerContext());
- verify(mockWindowFunction, times(2)).apply(eq(0), anyTimeWindow(), anyIntIterable(), WindowOperatorContractTest.<Void>anyCollector());
- verify(mockWindowFunction, times(1)).apply(eq(0), eq(new TimeWindow(0, 2)), intIterable(0), WindowOperatorContractTest.<Void>anyCollector());
- verify(mockWindowFunction, times(1)).apply(eq(0), eq(new TimeWindow(2, 4)), intIterable(0), WindowOperatorContractTest.<Void>anyCollector());
+ verify(mockWindowFunction, times(2)).process(eq(0), anyTimeWindow(), anyInternalWindowContext(), anyIntIterable(), WindowOperatorContractTest.<Void>anyCollector());
+ verify(mockWindowFunction, times(1)).process(eq(0), eq(new TimeWindow(0, 2)), anyInternalWindowContext(), intIterable(0), WindowOperatorContractTest.<Void>anyCollector());
+ verify(mockWindowFunction, times(1)).process(eq(0), eq(new TimeWindow(2, 4)), anyInternalWindowContext(), intIterable(0), WindowOperatorContractTest.<Void>anyCollector());
// it's also called for the cleanup timers
verify(mockTrigger, times(4)).onEventTime(anyLong(), anyTimeWindow(), anyTriggerContext());
@@ -2339,6 +2343,96 @@ public abstract class WindowOperatorContractTest extends TestLogger {
assertEquals(0, testHarness.numEventTimeTimers());
}
+ @Test
+ public void testPerWindowStateSetAndClearedOnEventTimePurge() throws Exception {
+ testPerWindowStateSetAndClearedOnPurge(new EventTimeAdaptor());
+ }
+
+ @Test
+ public void testPerWindowStateSetAndClearedOnProcessingTimePurge() throws Exception {
+ testPerWindowStateSetAndClearedOnPurge(new ProcessingTimeAdaptor());
+ }
+
+ public void testPerWindowStateSetAndClearedOnPurge(TimeDomainAdaptor timeAdaptor) throws Exception {
+ WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner();
+ timeAdaptor.setIsEventTime(mockAssigner);
+ Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
+ InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
+
+ KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness =
+ createWindowOperator(mockAssigner, mockTrigger, 20L, mockWindowFunction);
+
+ testHarness.open();
+
+ when(mockTrigger.onElement(anyInt(), anyLong(), anyTimeWindow(), anyTriggerContext()))
+ .thenReturn(TriggerResult.FIRE);
+
+ when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext()))
+ .thenReturn(Arrays.asList(new TimeWindow(0, 20)));
+
+ doAnswer(new Answer<Object>() {
+ @Override
+ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+ InternalWindowFunction.InternalWindowContext context = (InternalWindowFunction.InternalWindowContext)invocationOnMock.getArguments()[2];
+ context.windowState().getState(valueStateDescriptor).update("hello");
+ return null;
+ }
+ }).when(mockWindowFunction).process(anyInt(), anyTimeWindow(), anyInternalWindowContext(), anyIntIterable(), WindowOperatorContractTest.<Void>anyCollector());
+
+ doAnswer(new Answer<Object>() {
+ @Override
+ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+ InternalWindowFunction.InternalWindowContext context = (InternalWindowFunction.InternalWindowContext)invocationOnMock.getArguments()[1];
+ context.windowState().getState(valueStateDescriptor).clear();
+ return null;
+ }
+ }).when(mockWindowFunction).clear(anyTimeWindow(), anyInternalWindowContext());
+
+ assertEquals(0, testHarness.getOutput().size());
+ assertEquals(0, testHarness.numKeyedStateEntries());
+
+ testHarness.processElement(new StreamRecord<>(0, 0L));
+
+ assertEquals(2, testHarness.numKeyedStateEntries()); // window contents plus value state
+ assertEquals(1, timeAdaptor.numTimers(testHarness)); // gc timers
+
+ timeAdaptor.advanceTime(testHarness, 19 + 20); // 19 is maxTime of the window
+
+ assertEquals(0, testHarness.numKeyedStateEntries());
+ assertEquals(0, timeAdaptor.numTimers(testHarness));
+ }
+
+ @Test
+ public void testWindowStateNotAvailableToMergingWindows() throws Exception {
+ WindowAssigner<Integer, TimeWindow> mockAssigner = mockMergingAssigner();
+ Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
+ InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
+
+ KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness =
+ createWindowOperator(mockAssigner, mockTrigger, 20L, mockWindowFunction);
+
+ testHarness.open();
+
+ when(mockTrigger.onElement(anyInt(), anyLong(), anyTimeWindow(), anyTriggerContext()))
+ .thenReturn(TriggerResult.FIRE);
+
+ when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext()))
+ .thenReturn(Arrays.asList(new TimeWindow(0, 20)));
+
+ doAnswer(new Answer<Object>() {
+ @Override
+ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+ InternalWindowFunction.InternalWindowContext context = (InternalWindowFunction.InternalWindowContext)invocationOnMock.getArguments()[2];
+ context.windowState().getState(valueStateDescriptor).update("hello");
+ return null;
+ }
+ }).when(mockWindowFunction).process(anyInt(), anyTimeWindow(), anyInternalWindowContext(), anyIntIterable(), WindowOperatorContractTest.<Void>anyCollector());
+
+ expectedException.expect(UnsupportedOperationException.class);
+ expectedException.expectMessage("Per-window state is not allowed when using merging windows.");
+ testHarness.processElement(new StreamRecord<>(0, 0L));
+ }
+
protected abstract <W extends Window, OUT> KeyedOneInputStreamOperatorTestHarness<Integer, Integer, OUT> createWindowOperator(
WindowAssigner<Integer, W> assigner,
Trigger<Integer, W> trigger,
http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessAllWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessAllWindowFunction.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessAllWindowFunction.scala
index 163117b..2f0e48e 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessAllWindowFunction.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessAllWindowFunction.scala
@@ -22,6 +22,7 @@ import java.io.Serializable
import org.apache.flink.annotation.PublicEvolving
import org.apache.flink.api.common.functions.Function
+import org.apache.flink.api.common.state.KeyedStateStore
import org.apache.flink.streaming.api.windowing.windows.Window
import org.apache.flink.util.Collector
@@ -47,6 +48,15 @@ abstract class ProcessAllWindowFunction[IN, OUT, W <: Window] extends Function w
def process(context: Context, elements: Iterable[IN], out: Collector[OUT])
/**
+ * Deletes any state in the [[Context]] when the Window is purged.
+ *
+ * @param context The context to which the window is being evaluated
+ * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
+ */
+ @throws[Exception]
+ def clear(context: Context) {}
+
+ /**
* The context holding window metadata
*/
abstract class Context {
@@ -54,6 +64,16 @@ abstract class ProcessAllWindowFunction[IN, OUT, W <: Window] extends Function w
* @return The window that is being evaluated.
*/
def window: W
+
+ /**
+ * State accessor for per-key and per-window state.
+ */
+ def windowState: KeyedStateStore
+
+ /**
+ * State accessor for per-key global state.
+ */
+ def globalState: KeyedStateStore
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessWindowFunction.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessWindowFunction.scala
index 79f3918..bdf6ae6 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessWindowFunction.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessWindowFunction.scala
@@ -22,6 +22,7 @@ import java.io.Serializable
import org.apache.flink.annotation.PublicEvolving
import org.apache.flink.api.common.functions.Function
+import org.apache.flink.api.common.state.KeyedStateStore
import org.apache.flink.streaming.api.windowing.windows.Window
import org.apache.flink.util.Collector
@@ -49,6 +50,15 @@ abstract class ProcessWindowFunction[IN, OUT, KEY, W <: Window] extends Function
def process(key: KEY, context: Context, elements: Iterable[IN], out: Collector[OUT])
/**
+ * Deletes any state in the [[Context]] when the Window is purged.
+ *
+ * @param context The context to which the window is being evaluated
+ * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
+ */
+ @throws[Exception]
+ def clear(context: Context) {}
+
+ /**
* The context holding window metadata
*/
abstract class Context {
@@ -56,6 +66,16 @@ abstract class ProcessWindowFunction[IN, OUT, KEY, W <: Window] extends Function
* @return The window that is being evaluated.
*/
def window: W
+
+ /**
+ * State accessor for per-key and per-window state.
+ */
+ def windowState: KeyedStateStore
+
+ /**
+ * State accessor for per-key global state.
+ */
+ def globalState: KeyedStateStore
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala
index a4fec64..fac5958 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala
@@ -52,10 +52,25 @@ final class ScalaProcessWindowFunctionWrapper[IN, OUT, KEY, W <: Window](
out: Collector[OUT]): Unit = {
val ctx = new func.Context {
override def window = context.window
+
+ override def windowState = context.windowState()
+
+ override def globalState = context.globalState()
}
func.process(key, ctx, elements.asScala, out)
}
+ override def clear(context: JProcessWindowFunction[IN, OUT, KEY, W]#Context): Unit = {
+ val ctx = new func.Context {
+ override def window = context.window
+
+ override def windowState = context.windowState()
+
+ override def globalState = context.globalState()
+ }
+ func.clear(ctx)
+ }
+
override def setRuntimeContext(t: RuntimeContext): Unit = {
super.setRuntimeContext(t)
func match {
@@ -99,10 +114,26 @@ final class ScalaProcessAllWindowFunctionWrapper[IN, OUT, W <: Window](
out: Collector[OUT]): Unit = {
val ctx = new func.Context {
override def window = context.window
+
+ override def windowState = context.windowState()
+
+ override def globalState = context.globalState()
}
func.process(ctx, elements.asScala, out)
}
+ override def clear(context: JProcessAllWindowFunction[IN, OUT, W]#Context): Unit = {
+ val ctx = new func.Context {
+ override def window = context.window
+
+ override def windowState = context.windowState()
+
+ override def globalState = context.globalState()
+ }
+ func.clear(ctx)
+ }
+
+
override def setRuntimeContext(t: RuntimeContext): Unit = {
super.setRuntimeContext(t)
func match {