You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/03/25 16:01:37 UTC
[2/3] flink git commit: [FLINK-5929] Allow Access to Per-Window State
in ProcessWindowFunction
[FLINK-5929] Allow Access to Per-Window State in ProcessWindowFunction
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fad201bf
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fad201bf
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fad201bf
Branch: refs/heads/master
Commit: fad201bfb0b1f2757f68f7b3ffaf97a486eb93e8
Parents: 5c37e55
Author: Seth Wiesman <sw...@mediamath.com>
Authored: Sun Mar 5 23:07:18 2017 -0500
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Sat Mar 25 16:59:17 2017 +0100
----------------------------------------------------------------------
.../FoldApplyProcessAllWindowFunction.java | 23 +-
.../FoldApplyProcessWindowFunction.java | 23 +-
.../InternalProcessApplyAllWindowContext.java | 57 +++++
.../InternalProcessApplyWindowContext.java | 58 +++++
.../windowing/ProcessAllWindowFunction.java | 22 ++
.../windowing/ProcessWindowFunction.java | 24 +-
.../ReduceApplyProcessAllWindowFunction.java | 23 +-
.../ReduceApplyProcessWindowFunction.java | 21 +-
.../windowing/AccumulatingKeyedTimePanes.java | 75 ++++++-
.../windowing/EvictingWindowOperator.java | 62 +++---
.../operators/windowing/WindowOperator.java | 220 +++++++++++++++----
...ternalAggregateProcessAllWindowFunction.java | 28 ++-
.../InternalAggregateProcessWindowFunction.java | 28 ++-
.../InternalIterableAllWindowFunction.java | 7 +-
...nternalIterableProcessAllWindowFunction.java | 31 ++-
.../InternalIterableProcessWindowFunction.java | 24 +-
.../InternalIterableWindowFunction.java | 7 +-
.../InternalProcessAllWindowContext.java | 57 +++++
.../functions/InternalProcessWindowContext.java | 58 +++++
.../InternalSingleValueAllWindowFunction.java | 7 +-
...rnalSingleValueProcessAllWindowFunction.java | 29 ++-
...nternalSingleValueProcessWindowFunction.java | 24 +-
.../InternalSingleValueWindowFunction.java | 7 +-
.../functions/InternalWindowFunction.java | 26 ++-
.../FoldApplyProcessWindowFunctionTest.java | 82 ++++++-
.../functions/InternalWindowFunctionTest.java | 49 +++--
.../RegularWindowOperatorContractTest.java | 12 +-
.../windowing/WindowOperatorContractTest.java | 158 ++++++++++---
.../function/ProcessAllWindowFunction.scala | 20 ++
.../scala/function/ProcessWindowFunction.scala | 20 ++
.../ScalaProcessWindowFunctionWrapper.scala | 31 +++
31 files changed, 1091 insertions(+), 222 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessAllWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessAllWindowFunction.java
index 5ac6766..8e8e52c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessAllWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessAllWindowFunction.java
@@ -50,6 +50,7 @@ public class FoldApplyProcessAllWindowFunction<W extends Window, T, ACC, R>
private TypeSerializer<ACC> accSerializer;
private final TypeInformation<ACC> accTypeInformation;
private transient ACC initialValue;
+ private transient InternalProcessApplyAllWindowContext<ACC, R, W> ctx;
public FoldApplyProcessAllWindowFunction(ACC initialValue, FoldFunction<T, ACC> foldFunction, ProcessAllWindowFunction<ACC, R, W> windowFunction, TypeInformation<ACC> accTypeInformation) {
this.windowFunction = windowFunction;
@@ -70,6 +71,9 @@ public class FoldApplyProcessAllWindowFunction<W extends Window, T, ACC, R>
ByteArrayInputStream bais = new ByteArrayInputStream(serializedInitialValue);
DataInputViewStreamWrapper in = new DataInputViewStreamWrapper(bais);
initialValue = accSerializer.deserialize(in);
+
+ ctx = new InternalProcessApplyAllWindowContext<>(windowFunction);
+
}
@Override
@@ -92,12 +96,19 @@ public class FoldApplyProcessAllWindowFunction<W extends Window, T, ACC, R>
result = foldFunction.fold(result, val);
}
- windowFunction.process(windowFunction.new Context() {
- @Override
- public W window() {
- return context.window();
- }
- }, Collections.singletonList(result), out);
+ this.ctx.window = context.window();
+ this.ctx.windowState = context.windowState();
+ this.ctx.globalState = context.globalState();
+
+ windowFunction.process(ctx, Collections.singletonList(result), out);
+ }
+
+ @Override
+ public void clear(final Context context) throws Exception {
+ this.ctx.window = context.window();
+ this.ctx.windowState = context.windowState();
+ this.ctx.globalState = context.globalState();
+ windowFunction.clear(ctx);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessWindowFunction.java
index e1bc759..073a2f3 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyProcessWindowFunction.java
@@ -50,6 +50,7 @@ public class FoldApplyProcessWindowFunction<K, W extends Window, T, ACC, R>
private TypeSerializer<ACC> accSerializer;
private final TypeInformation<ACC> accTypeInformation;
private transient ACC initialValue;
+ private transient InternalProcessApplyWindowContext<ACC, R, K, W> ctx;
public FoldApplyProcessWindowFunction(ACC initialValue, FoldFunction<T, ACC> foldFunction, ProcessWindowFunction<ACC, R, K, W> windowFunction, TypeInformation<ACC> accTypeInformation) {
this.windowFunction = windowFunction;
@@ -70,6 +71,8 @@ public class FoldApplyProcessWindowFunction<K, W extends Window, T, ACC, R>
ByteArrayInputStream bais = new ByteArrayInputStream(serializedInitialValue);
DataInputViewStreamWrapper in = new DataInputViewStreamWrapper(bais);
initialValue = accSerializer.deserialize(in);
+
+ ctx = new InternalProcessApplyWindowContext<>(windowFunction);
}
@Override
@@ -85,19 +88,25 @@ public class FoldApplyProcessWindowFunction<K, W extends Window, T, ACC, R>
}
@Override
- public void process(K key, final Context context, Iterable<T> values, Collector<R> out) throws Exception {
+ public void process(K key, Context context, Iterable<T> values, Collector<R> out) throws Exception {
ACC result = accSerializer.copy(initialValue);
for (T val : values) {
result = foldFunction.fold(result, val);
}
- windowFunction.process(key, windowFunction.new Context() {
- @Override
- public W window() {
- return context.window();
- }
- }, Collections.singletonList(result), out);
+ this.ctx.window = context.window();
+ this.ctx.windowState = context.windowState();
+ this.ctx.globalState = context.globalState();
+ windowFunction.process(key, ctx, Collections.singletonList(result), out);
+ }
+
+ @Override
+ public void clear(final Context context) throws Exception{
+ this.ctx.window = context.window();
+ this.ctx.windowState = context.windowState();
+ this.ctx.globalState = context.globalState();
+ windowFunction.clear(ctx);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/InternalProcessApplyAllWindowContext.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/InternalProcessApplyAllWindowContext.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/InternalProcessApplyAllWindowContext.java
new file mode 100644
index 0000000..e1a0a98
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/InternalProcessApplyAllWindowContext.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.windowing;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.state.KeyedStateStore;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+
+/**
+ * Internal reusable context wrapper.
+ *
+ * @param <IN> The type of the input value.
+ * @param <OUT> The type of the output value.
+ * @param <W> The type of the window.
+ */
+@Internal
+public class InternalProcessApplyAllWindowContext<IN, OUT, W extends Window>
+ extends ProcessAllWindowFunction<IN, OUT, W>.Context {
+
+ W window;
+ KeyedStateStore windowState;
+ KeyedStateStore globalState;
+
+ InternalProcessApplyAllWindowContext(ProcessAllWindowFunction<IN, OUT, W> function) {
+ function.super();
+ }
+
+ @Override
+ public W window() {
+ return window;
+ }
+
+ @Override
+ public KeyedStateStore windowState() {
+ return windowState;
+ }
+
+ @Override
+ public KeyedStateStore globalState() {
+ return globalState;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/InternalProcessApplyWindowContext.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/InternalProcessApplyWindowContext.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/InternalProcessApplyWindowContext.java
new file mode 100644
index 0000000..f547adc
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/InternalProcessApplyWindowContext.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.windowing;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.state.KeyedStateStore;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+
+/**
+ * Internal reusable context wrapper.
+ *
+ * @param <IN> The type of the input value.
+ * @param <OUT> The type of the output value.
+ * @param <KEY> The type of the window key.
+ * @param <W> The type of the window.
+ */
+@Internal
+public class InternalProcessApplyWindowContext<IN, OUT, KEY, W extends Window>
+ extends ProcessWindowFunction<IN, OUT, KEY, W>.Context {
+
+ W window;
+ KeyedStateStore windowState;
+ KeyedStateStore globalState;
+
+ InternalProcessApplyWindowContext(ProcessWindowFunction<IN, OUT, KEY, W> function) {
+ function.super();
+ }
+
+ @Override
+ public W window() {
+ return window;
+ }
+
+ @Override
+ public KeyedStateStore windowState() {
+ return windowState;
+ }
+
+ @Override
+ public KeyedStateStore globalState() {
+ return globalState;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessAllWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessAllWindowFunction.java
index 622e020..f49aa27 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessAllWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessAllWindowFunction.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.functions.windowing;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.state.KeyedStateStore;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.util.Collector;
@@ -48,6 +49,14 @@ public abstract class ProcessAllWindowFunction<IN, OUT, W extends Window> implem
public abstract void process(Context context, Iterable<IN> elements, Collector<OUT> out) throws Exception;
/**
+ * Deletes any state in the {@code Context} when the Window is purged.
+ *
+ * @param context The context to which the window is being evaluated
+ * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
+ */
+ public void clear(Context context) throws Exception {}
+
+ /**
* The context holding window metadata
*/
public abstract class Context {
@@ -55,5 +64,18 @@ public abstract class ProcessAllWindowFunction<IN, OUT, W extends Window> implem
* @return The window that is being evaluated.
*/
public abstract W window();
+
+ /**
+ * State accessor for per-key and per-window state.
+ *
+ * <p><b>NOTE:</b>If you use per-window state you have to ensure that you clean it up
+ * by implementing {@link ProcessWindowFunction#clear(ProcessWindowFunction.Context)}.
+ */
+ public abstract KeyedStateStore windowState();
+
+ /**
+ * State accessor for per-key global state.
+ */
+ public abstract KeyedStateStore globalState();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.java
index 9c48e24..bcefaf7 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.functions.windowing;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.state.KeyedStateStore;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.util.Collector;
@@ -50,12 +51,33 @@ public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> impl
public abstract void process(KEY key, Context context, Iterable<IN> elements, Collector<OUT> out) throws Exception;
/**
+ * Deletes any state in the {@code Context} when the Window is purged.
+ *
+ * @param context The context to which the window is being evaluated
+ * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
+ */
+ public void clear(Context context) throws Exception {}
+
+ /**
* The context holding window metadata
*/
- public abstract class Context {
+ public abstract class Context implements java.io.Serializable {
/**
* @return The window that is being evaluated.
*/
public abstract W window();
+
+ /**
+ * State accessor for per-key and per-window state.
+ *
+ * <p><b>NOTE:</b>If you use per-window state you have to ensure that you clean it up
+ * by implementing {@link ProcessWindowFunction#clear(Context)}.
+ */
+ public abstract KeyedStateStore windowState();
+
+ /**
+ * State accessor for per-key global state.
+ */
+ public abstract KeyedStateStore globalState();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessAllWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessAllWindowFunction.java
index 142c71e..4c54c94 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessAllWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessAllWindowFunction.java
@@ -35,6 +35,7 @@ public class ReduceApplyProcessAllWindowFunction<W extends Window, T, R>
private final ReduceFunction<T> reduceFunction;
private final ProcessAllWindowFunction<T, R, W> windowFunction;
+ private transient InternalProcessApplyAllWindowContext<T, R, W> ctx;
public ReduceApplyProcessAllWindowFunction(ReduceFunction<T> reduceFunction, ProcessAllWindowFunction<T, R, W> windowFunction) {
this.windowFunction = windowFunction;
@@ -52,17 +53,27 @@ public class ReduceApplyProcessAllWindowFunction<W extends Window, T, R>
curr = reduceFunction.reduce(curr, val);
}
}
- windowFunction.process(windowFunction.new Context() {
- @Override
- public W window() {
- return context.window();
- }
- }, Collections.singletonList(curr), out);
+
+ this.ctx.window = context.window();
+ this.ctx.windowState = context.windowState();
+ this.ctx.globalState = context.globalState();
+
+ windowFunction.process(ctx, Collections.singletonList(curr), out);
+ }
+
+ @Override
+ public void clear(final Context context) throws Exception {
+ this.ctx.window = context.window();
+ this.ctx.windowState = context.windowState();
+ this.ctx.globalState = context.globalState();
+
+ windowFunction.clear(ctx);
}
@Override
public void open(Configuration configuration) throws Exception {
FunctionUtils.openFunction(this.windowFunction, configuration);
+ ctx = new InternalProcessApplyAllWindowContext<>(windowFunction);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessWindowFunction.java
index 9ea1fdf..1af783a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ReduceApplyProcessWindowFunction.java
@@ -35,6 +35,7 @@ public class ReduceApplyProcessWindowFunction<K, W extends Window, T, R>
private final ReduceFunction<T> reduceFunction;
private final ProcessWindowFunction<T, R, K, W> windowFunction;
+ private transient InternalProcessApplyWindowContext<T, R, K, W> ctx;
public ReduceApplyProcessWindowFunction(ReduceFunction<T> reduceFunction, ProcessWindowFunction<T, R, K, W> windowFunction) {
this.windowFunction = windowFunction;
@@ -52,17 +53,25 @@ public class ReduceApplyProcessWindowFunction<K, W extends Window, T, R>
curr = reduceFunction.reduce(curr, val);
}
}
- windowFunction.process(k, windowFunction.new Context() {
- @Override
- public W window() {
- return context.window();
- }
- }, Collections.singletonList(curr), out);
+
+ this.ctx.window = context.window();
+ this.ctx.windowState = context.windowState();
+ this.ctx.globalState = context.globalState();
+ windowFunction.process(k, ctx, Collections.singletonList(curr), out);
+ }
+
+ @Override
+ public void clear(final Context context) throws Exception {
+ this.ctx.window = context.window();
+ this.ctx.windowState = context.windowState();
+ this.ctx.globalState = context.globalState();
+ windowFunction.clear(ctx);
}
@Override
public void open(Configuration configuration) throws Exception {
FunctionUtils.openFunction(this.windowFunction, configuration);
+ ctx = new InternalProcessApplyWindowContext<>(windowFunction);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java
index 87c5aca..d58b5cc 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java
@@ -19,6 +19,17 @@
package org.apache.flink.streaming.runtime.operators.windowing;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.state.FoldingState;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.KeyedStateStore;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingState;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction;
import org.apache.flink.util.UnionIterator;
@@ -38,6 +49,8 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> extends AbstractKeyed
private final InternalWindowFunction<Iterable<Type>, Result, Key, Window> function;
+ private final AccumulatingKeyedTimePanesContext context;
+
/**
* IMPORTANT: This value needs to start at one, so it is fresher than the value that new entries have (zero) */
private long evaluationPass = 1L;
@@ -47,6 +60,7 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> extends AbstractKeyed
public AccumulatingKeyedTimePanes(KeySelector<Type, Key> keySelector, InternalWindowFunction<Iterable<Type>, Result, Key, Window> function) {
this.keySelector = keySelector;
this.function = function;
+ this.context = new AccumulatingKeyedTimePanesContext();
}
// ------------------------------------------------------------------------
@@ -67,13 +81,15 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> extends AbstractKeyed
for (KeyMap.Entry<Key, ArrayList<Type>> entry : latestPane) {
Key key = entry.getKey();
operator.setCurrentKey(key);
- function.apply(entry.getKey(), window, entry.getValue(), out);
+ context.globalState = operator.getKeyedStateStore();
+
+ function.process(entry.getKey(), window, context, entry.getValue(), out);
}
}
else {
// general code path for multi-pane case
WindowFunctionTraversal<Key, Type, Result> evaluator = new WindowFunctionTraversal<>(
- function, window, out, operator);
+ function, window, out, operator, context);
traverseAllPanes(evaluator, evaluationPass);
}
@@ -95,17 +111,19 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> extends AbstractKeyed
private final TimeWindow window;
private final AbstractStreamOperator<Result> contextOperator;
-
+
private Key currentKey;
+ private AccumulatingKeyedTimePanesContext context;
WindowFunctionTraversal(InternalWindowFunction<Iterable<Type>, Result, Key, Window> function, TimeWindow window,
- Collector<Result> out, AbstractStreamOperator<Result> contextOperator) {
+ Collector<Result> out, AbstractStreamOperator<Result> contextOperator, AccumulatingKeyedTimePanesContext context) {
this.function = function;
this.out = out;
this.unionIterator = new UnionIterator<>();
this.window = window;
this.contextOperator = contextOperator;
+ this.context = context;
}
@@ -123,7 +141,8 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> extends AbstractKeyed
@Override
public void keyDone() throws Exception {
contextOperator.setCurrentKey(currentKey);
- function.apply(currentKey, window, unionIterator, out);
+ context.globalState = contextOperator.getKeyedStateStore();
+ function.process(currentKey, window, context, unionIterator, out);
}
}
@@ -136,6 +155,52 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> extends AbstractKeyed
return (KeyMap.LazyFactory<ArrayList<V>>) LIST_FACTORY;
}
+ private static class ThrowingKeyedStateStore implements KeyedStateStore {
+ @Override
+ public <T> ValueState<T> getState(ValueStateDescriptor<T> stateProperties) {
+ throw new UnsupportedOperationException("Per-window state is not supported when using aligned processing-time windows.");
+ }
+
+ @Override
+ public <T> ListState<T> getListState(ListStateDescriptor<T> stateProperties) {
+ throw new UnsupportedOperationException("Per-window state is not supported when using aligned processing-time windows.");
+ }
+
+ @Override
+ public <T> ReducingState<T> getReducingState(ReducingStateDescriptor<T> stateProperties) {
+ throw new UnsupportedOperationException("Per-window state is not supported when using aligned processing-time windows.");
+ }
+
+ @Override
+ public <T, A> FoldingState<T, A> getFoldingState(FoldingStateDescriptor<T, A> stateProperties) {
+ throw new UnsupportedOperationException("Per-window state is not supported when using aligned processing-time windows.");
+ }
+
+ @Override
+ public <UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> stateProperties) {
+ throw new UnsupportedOperationException("Per-window state is not supported when using aligned processing-time windows.");
+ }
+ }
+
+ private static class AccumulatingKeyedTimePanesContext implements InternalWindowFunction.InternalWindowContext {
+ KeyedStateStore globalState;
+ KeyedStateStore throwingStore;
+
+ public AccumulatingKeyedTimePanesContext() {
+ this.throwingStore = new ThrowingKeyedStateStore();
+ }
+
+ @Override
+ public KeyedStateStore windowState() {
+ return throwingStore;
+ }
+
+ @Override
+ public KeyedStateStore globalState() {
+ return globalState;
+ }
+ }
+
private static final KeyMap.LazyFactory<?> LIST_FACTORY = new KeyMap.LazyFactory<ArrayList<?>>() {
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
index 24c8d32..85451a5 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
@@ -134,14 +134,14 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window>
" window: " + mergeResult);
}
- context.key = key;
- context.window = mergeResult;
+ triggerContext.key = key;
+ triggerContext.window = mergeResult;
- context.onMerge(mergedWindows);
+ triggerContext.onMerge(mergedWindows);
for (W m : mergedWindows) {
- context.window = m;
- context.clear();
+ triggerContext.window = m;
+ triggerContext.clear();
deleteCleanupTimer(m);
}
@@ -165,12 +165,12 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window>
evictingWindowState.setCurrentNamespace(stateWindow);
evictingWindowState.add(element);
- context.key = key;
- context.window = actualWindow;
+ triggerContext.key = key;
+ triggerContext.window = actualWindow;
evictorContext.key = key;
evictorContext.window = actualWindow;
- TriggerResult triggerResult = context.onElement(element);
+ TriggerResult triggerResult = triggerContext.onElement(element);
if (triggerResult.isFire()) {
Iterable<StreamRecord<IN>> contents = evictingWindowState.get();
@@ -201,12 +201,12 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window>
evictingWindowState.setCurrentNamespace(window);
evictingWindowState.add(element);
- context.key = key;
- context.window = window;
+ triggerContext.key = key;
+ triggerContext.window = window;
evictorContext.key = key;
evictorContext.window = window;
- TriggerResult triggerResult = context.onElement(element);
+ TriggerResult triggerResult = triggerContext.onElement(element);
if (triggerResult.isFire()) {
Iterable<StreamRecord<IN>> contents = evictingWindowState.get();
@@ -236,8 +236,8 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window>
@Override
public void onEventTime(InternalTimer<K, W> timer) throws Exception {
- context.key = timer.getKey();
- context.window = timer.getNamespace();
+ triggerContext.key = timer.getKey();
+ triggerContext.window = timer.getNamespace();
evictorContext.key = timer.getKey();
evictorContext.window = timer.getNamespace();
@@ -245,7 +245,7 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window>
if (windowAssigner instanceof MergingWindowAssigner) {
mergingWindows = getMergingWindowSet();
- W stateWindow = mergingWindows.getStateWindow(context.window);
+ W stateWindow = mergingWindows.getStateWindow(triggerContext.window);
if (stateWindow == null) {
// Timer firing for non-existent window, this can only happen if a
// trigger did not clean up timers. We have already cleared the merging
@@ -255,23 +255,23 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window>
evictingWindowState.setCurrentNamespace(stateWindow);
}
} else {
- evictingWindowState.setCurrentNamespace(context.window);
+ evictingWindowState.setCurrentNamespace(triggerContext.window);
}
Iterable<StreamRecord<IN>> contents = evictingWindowState.get();
if (contents != null) {
- TriggerResult triggerResult = context.onEventTime(timer.getTimestamp());
+ TriggerResult triggerResult = triggerContext.onEventTime(timer.getTimestamp());
if (triggerResult.isFire()) {
- emitWindowContents(context.window, contents, evictingWindowState);
+ emitWindowContents(triggerContext.window, contents, evictingWindowState);
}
if (triggerResult.isPurge()) {
evictingWindowState.clear();
}
}
- if (windowAssigner.isEventTime() && isCleanupTime(context.window, timer.getTimestamp())) {
- clearAllState(context.window, evictingWindowState, mergingWindows);
+ if (windowAssigner.isEventTime() && isCleanupTime(triggerContext.window, timer.getTimestamp())) {
+ clearAllState(triggerContext.window, evictingWindowState, mergingWindows);
}
if (mergingWindows != null) {
@@ -282,8 +282,8 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window>
@Override
public void onProcessingTime(InternalTimer<K, W> timer) throws Exception {
- context.key = timer.getKey();
- context.window = timer.getNamespace();
+ triggerContext.key = timer.getKey();
+ triggerContext.window = timer.getNamespace();
evictorContext.key = timer.getKey();
evictorContext.window = timer.getNamespace();
@@ -291,7 +291,7 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window>
if (windowAssigner instanceof MergingWindowAssigner) {
mergingWindows = getMergingWindowSet();
- W stateWindow = mergingWindows.getStateWindow(context.window);
+ W stateWindow = mergingWindows.getStateWindow(triggerContext.window);
if (stateWindow == null) {
// Timer firing for non-existent window, this can only happen if a
// trigger did not clean up timers. We have already cleared the merging
@@ -301,23 +301,23 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window>
evictingWindowState.setCurrentNamespace(stateWindow);
}
} else {
- evictingWindowState.setCurrentNamespace(context.window);
+ evictingWindowState.setCurrentNamespace(triggerContext.window);
}
Iterable<StreamRecord<IN>> contents = evictingWindowState.get();
if (contents != null) {
- TriggerResult triggerResult = context.onProcessingTime(timer.getTimestamp());
+ TriggerResult triggerResult = triggerContext.onProcessingTime(timer.getTimestamp());
if (triggerResult.isFire()) {
- emitWindowContents(context.window, contents, evictingWindowState);
+ emitWindowContents(triggerContext.window, contents, evictingWindowState);
}
if (triggerResult.isPurge()) {
evictingWindowState.clear();
}
}
- if (!windowAssigner.isEventTime() && isCleanupTime(context.window, timer.getTimestamp())) {
- clearAllState(context.window, evictingWindowState, mergingWindows);
+ if (!windowAssigner.isEventTime() && isCleanupTime(triggerContext.window, timer.getTimestamp())) {
+ clearAllState(triggerContext.window, evictingWindowState, mergingWindows);
}
if (mergingWindows != null) {
@@ -348,7 +348,8 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window>
}
});
- userFunction.apply(context.key, context.window, projectedContents, timestampedCollector);
+ processContext.window = triggerContext.window;
+ userFunction.process(triggerContext.key, triggerContext.window, processContext, projectedContents, timestampedCollector);
evictorContext.evictAfter(recordsWithTimestamp, Iterables.size(recordsWithTimestamp));
@@ -364,9 +365,10 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window>
W window,
ListState<StreamRecord<IN>> windowState,
MergingWindowSet<W> mergingWindows) throws Exception {
-
windowState.clear();
- context.clear();
+ triggerContext.clear();
+ processContext.window = window;
+ processContext.clear();
if (mergingWindows != null) {
mergingWindows.retireWindow(window);
mergingWindows.persist();
http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index 3745659..3d40716 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -23,8 +23,16 @@ import org.apache.commons.math3.util.ArithmeticUtils;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.state.AppendingState;
+import org.apache.flink.api.common.state.FoldingState;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.KeyedStateStore;
+import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.MergingState;
+import org.apache.flink.api.common.state.ReducingState;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueState;
@@ -159,7 +167,9 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
/** This is given to the {@code InternalWindowFunction} for emitting elements with a given timestamp. */
protected transient TimestampedCollector<OUT> timestampedCollector;
- protected transient Context context = new Context(null, null);
+ protected transient Context triggerContext = new Context(null, null);
+
+ protected transient WindowContext processContext = new WindowContext(null);
protected transient WindowAssigner.WindowAssignerContext windowAssignerContext;
@@ -264,7 +274,8 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
internalTimerService =
getInternalTimerService("window-timers", windowSerializer, this);
- context = new Context(null, null);
+ triggerContext = new Context(null, null);
+ processContext = new WindowContext( null);
windowAssignerContext = new WindowAssigner.WindowAssignerContext() {
@Override
@@ -317,7 +328,8 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
public void close() throws Exception {
super.close();
timestampedCollector = null;
- context = null;
+ triggerContext = null;
+ processContext = null;
windowAssignerContext = null;
}
@@ -325,7 +337,8 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
public void dispose() throws Exception {
super.dispose();
timestampedCollector = null;
- context = null;
+ triggerContext = null;
+ processContext = null;
windowAssignerContext = null;
}
@@ -365,14 +378,14 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
" window: " + mergeResult);
}
- context.key = key;
- context.window = mergeResult;
+ triggerContext.key = key;
+ triggerContext.window = mergeResult;
- context.onMerge(mergedWindows);
+ triggerContext.onMerge(mergedWindows);
for (W m: mergedWindows) {
- context.window = m;
- context.clear();
+ triggerContext.window = m;
+ triggerContext.clear();
deleteCleanupTimer(m);
}
@@ -396,10 +409,10 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
windowState.setCurrentNamespace(stateWindow);
windowState.add(element.getValue());
- context.key = key;
- context.window = actualWindow;
+ triggerContext.key = key;
+ triggerContext.window = actualWindow;
- TriggerResult triggerResult = context.onElement(element);
+ TriggerResult triggerResult = triggerContext.onElement(element);
if (triggerResult.isFire()) {
ACC contents = windowState.get();
@@ -429,10 +442,10 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
windowState.setCurrentNamespace(window);
windowState.add(element.getValue());
- context.key = key;
- context.window = window;
+ triggerContext.key = key;
+ triggerContext.window = window;
- TriggerResult triggerResult = context.onElement(element);
+ TriggerResult triggerResult = triggerContext.onElement(element);
if (triggerResult.isFire()) {
ACC contents = windowState.get();
@@ -460,14 +473,14 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
@Override
public void onEventTime(InternalTimer<K, W> timer) throws Exception {
- context.key = timer.getKey();
- context.window = timer.getNamespace();
+ triggerContext.key = timer.getKey();
+ triggerContext.window = timer.getNamespace();
MergingWindowSet<W> mergingWindows;
if (windowAssigner instanceof MergingWindowAssigner) {
mergingWindows = getMergingWindowSet();
- W stateWindow = mergingWindows.getStateWindow(context.window);
+ W stateWindow = mergingWindows.getStateWindow(triggerContext.window);
if (stateWindow == null) {
// Timer firing for non-existent window, this can only happen if a
// trigger did not clean up timers. We have already cleared the merging
@@ -477,7 +490,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
windowState.setCurrentNamespace(stateWindow);
}
} else {
- windowState.setCurrentNamespace(context.window);
+ windowState.setCurrentNamespace(triggerContext.window);
mergingWindows = null;
}
@@ -487,17 +500,17 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
}
if (contents != null) {
- TriggerResult triggerResult = context.onEventTime(timer.getTimestamp());
+ TriggerResult triggerResult = triggerContext.onEventTime(timer.getTimestamp());
if (triggerResult.isFire()) {
- emitWindowContents(context.window, contents);
+ emitWindowContents(triggerContext.window, contents);
}
if (triggerResult.isPurge()) {
windowState.clear();
}
}
- if (windowAssigner.isEventTime() && isCleanupTime(context.window, timer.getTimestamp())) {
- clearAllState(context.window, windowState, mergingWindows);
+ if (windowAssigner.isEventTime() && isCleanupTime(triggerContext.window, timer.getTimestamp())) {
+ clearAllState(triggerContext.window, windowState, mergingWindows);
}
if (mergingWindows != null) {
@@ -508,14 +521,14 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
@Override
public void onProcessingTime(InternalTimer<K, W> timer) throws Exception {
- context.key = timer.getKey();
- context.window = timer.getNamespace();
+ triggerContext.key = timer.getKey();
+ triggerContext.window = timer.getNamespace();
MergingWindowSet<W> mergingWindows;
if (windowAssigner instanceof MergingWindowAssigner) {
mergingWindows = getMergingWindowSet();
- W stateWindow = mergingWindows.getStateWindow(context.window);
+ W stateWindow = mergingWindows.getStateWindow(triggerContext.window);
if (stateWindow == null) {
// Timer firing for non-existent window, this can only happen if a
// trigger did not clean up timers. We have already cleared the merging
@@ -525,7 +538,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
windowState.setCurrentNamespace(stateWindow);
}
} else {
- windowState.setCurrentNamespace(context.window);
+ windowState.setCurrentNamespace(triggerContext.window);
mergingWindows = null;
}
@@ -535,17 +548,17 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
}
if (contents != null) {
- TriggerResult triggerResult = context.onProcessingTime(timer.getTimestamp());
+ TriggerResult triggerResult = triggerContext.onProcessingTime(timer.getTimestamp());
if (triggerResult.isFire()) {
- emitWindowContents(context.window, contents);
+ emitWindowContents(triggerContext.window, contents);
}
if (triggerResult.isPurge()) {
windowState.clear();
}
}
- if (!windowAssigner.isEventTime() && isCleanupTime(context.window, timer.getTimestamp())) {
- clearAllState(context.window, windowState, mergingWindows);
+ if (!windowAssigner.isEventTime() && isCleanupTime(triggerContext.window, timer.getTimestamp())) {
+ clearAllState(triggerContext.window, windowState, mergingWindows);
}
if (mergingWindows != null) {
@@ -559,14 +572,16 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
* {@link Trigger#clear(Window, Trigger.TriggerContext)}.
*
* <p>The caller must ensure that the
- * correct key is set in the state backend and the context object.
+ * correct key is set in the state backend and the triggerContext object.
*/
private void clearAllState(
W window,
AppendingState<IN, ACC> windowState,
MergingWindowSet<W> mergingWindows) throws Exception {
windowState.clear();
- context.clear();
+ triggerContext.clear();
+ processContext.window = window;
+ processContext.clear();
if (mergingWindows != null) {
mergingWindows.retireWindow(window);
mergingWindows.persist();
@@ -579,7 +594,8 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
@SuppressWarnings("unchecked")
private void emitWindowContents(W window, ACC contents) throws Exception {
timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp());
- userFunction.apply(context.key, context.window, contents, timestampedCollector);
+ processContext.window = window;
+ userFunction.process(triggerContext.key, window, processContext, contents, timestampedCollector);
}
/**
@@ -636,9 +652,9 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
}
if (windowAssigner.isEventTime()) {
- context.registerEventTimeTimer(cleanupTime);
+ triggerContext.registerEventTimeTimer(cleanupTime);
} else {
- context.registerProcessingTimeTimer(cleanupTime);
+ triggerContext.registerProcessingTimeTimer(cleanupTime);
}
}
@@ -654,9 +670,9 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
return;
}
if (windowAssigner.isEventTime()) {
- context.deleteEventTimeTimer(cleanupTime);
+ triggerContext.deleteEventTimeTimer(cleanupTime);
} else {
- context.deleteProcessingTimeTimer(cleanupTime);
+ triggerContext.deleteProcessingTimeTimer(cleanupTime);
}
}
@@ -686,6 +702,134 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
}
/**
+ * Base class for per-window {@link KeyedStateStore KeyedStateStores}. Used to allow per-window
+ * state access for {@link org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction}.
+ */
+ public abstract class AbstractPerWindowStateStore implements KeyedStateStore {
+
+ // we have this in the base class even though it's not used in MergingKeyStore so that
+ // we can always set it and ignore what actual implementation we have
+ protected W window;
+ }
+
+ /**
+ * Special {@link AbstractPerWindowStateStore} that doesn't allow access to per-window state.
+ */
+ public class MergingWindowStateStore extends AbstractPerWindowStateStore {
+ @Override
+ public <T> ValueState<T> getState(ValueStateDescriptor<T> stateProperties) {
+ throw new UnsupportedOperationException("Per-window state is not allowed when using merging windows.");
+ }
+
+ @Override
+ public <T> ListState<T> getListState(ListStateDescriptor<T> stateProperties) {
+ throw new UnsupportedOperationException("Per-window state is not allowed when using merging windows.");
+ }
+
+ @Override
+ public <T> ReducingState<T> getReducingState(ReducingStateDescriptor<T> stateProperties) {
+ throw new UnsupportedOperationException("Per-window state is not allowed when using merging windows.");
+ }
+
+ @Override
+ public <T, A> FoldingState<T, A> getFoldingState(FoldingStateDescriptor<T, A> stateProperties) {
+ throw new UnsupportedOperationException("Per-window state is not allowed when using merging windows.");
+ }
+
+ @Override
+ public <UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> stateProperties) {
+ throw new UnsupportedOperationException("Per-window state is not allowed when using merging windows.");
+ }
+ }
+
+ /**
+ * Regular per-window state store for use with
+ * {@link org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction}.
+ */
+ public class PerWindowStateStore extends AbstractPerWindowStateStore {
+ @Override
+ public <T> ValueState<T> getState(ValueStateDescriptor<T> stateProperties) {
+ try {
+ return WindowOperator.this.getPartitionedState(window, windowSerializer, stateProperties);
+ } catch (Exception e) {
+ throw new RuntimeException("Could not retrieve state", e);
+ }
+ }
+
+ @Override
+ public <T> ListState<T> getListState(ListStateDescriptor<T> stateProperties) {
+ try {
+ return WindowOperator.this.getPartitionedState(window, windowSerializer, stateProperties);
+ } catch (Exception e) {
+ throw new RuntimeException("Could not retrieve state", e);
+ }
+ }
+
+ @Override
+ public <T> ReducingState<T> getReducingState(ReducingStateDescriptor<T> stateProperties) {
+ try {
+ return WindowOperator.this.getPartitionedState(window, windowSerializer, stateProperties);
+ } catch (Exception e) {
+ throw new RuntimeException("Could not retrieve state", e);
+ }
+ }
+
+ @Override
+ public <T, ACC> FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC> stateProperties) {
+ try {
+ return WindowOperator.this.getPartitionedState(window, windowSerializer, stateProperties);
+ } catch (Exception e) {
+ throw new RuntimeException("Could not retrieve state", e);
+ }
+ }
+
+ @Override
+ public <UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> stateProperties) {
+ try {
+ return WindowOperator.this.getPartitionedState(window, windowSerializer, stateProperties);
+ } catch (Exception e) {
+ throw new RuntimeException("Could not retrieve state", e);
+ }
+ }
+ }
+
+ /**
+ * A utility class for handling {@code ProcessWindowFunction} invocations. This can be reused
+ * by setting the {@code key} and {@code window} fields. No internal state must be kept in the
+ * {@code WindowContext}.
+ */
+ public class WindowContext implements InternalWindowFunction.InternalWindowContext {
+ protected W window;
+
+ protected AbstractPerWindowStateStore windowState;
+
+ public WindowContext(W window) {
+ this.window = window;
+ this.windowState = windowAssigner instanceof MergingWindowAssigner ? new MergingWindowStateStore() : new PerWindowStateStore();
+ }
+
+ @Override
+ public String toString() {
+ return "WindowContext{Window = " + window.toString() + "}";
+ }
+
+ public void clear() throws Exception {
+ userFunction.clear(window, this);
+ }
+
+ @Override
+ public KeyedStateStore windowState() {
+ this.windowState.window = this.window;
+ return this.windowState;
+ }
+
+ @Override
+ public KeyedStateStore globalState() {
+ return WindowOperator.this.getKeyedStateStore();
+ }
+ }
+
+ /**
* {@code Context} is a utility for handling {@code Trigger} invocations. It can be reused
* by setting the {@code key} and {@code window} fields. No internal state must be kept in
* the {@code Context}
http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessAllWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessAllWindowFunction.java
index 9533c95..83e896d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessAllWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessAllWindowFunction.java
@@ -21,6 +21,7 @@ import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.IterationRuntimeContext;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.java.operators.translation.WrappingFunction;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.util.Collector;
@@ -45,6 +46,8 @@ public final class InternalAggregateProcessAllWindowFunction<T, ACC, V, R, W ext
private final AggregateFunction<T, ACC, V> aggFunction;
+ private transient InternalProcessAllWindowContext<V, R, W> ctx;
+
public InternalAggregateProcessAllWindowFunction(
AggregateFunction<T, ACC, V> aggFunction,
ProcessAllWindowFunction<V, R, W> windowFunction) {
@@ -53,22 +56,31 @@ public final class InternalAggregateProcessAllWindowFunction<T, ACC, V, R, W ext
}
@Override
- public void apply(Byte key, final W window, Iterable<T> input, Collector<R> out) throws Exception {
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
ProcessAllWindowFunction<V, R, W> wrappedFunction = this.wrappedFunction;
- ProcessAllWindowFunction<V, R, W>.Context context = wrappedFunction.new Context() {
- @Override
- public W window() {
- return window;
- }
- };
+ this.ctx = new InternalProcessAllWindowContext<>(wrappedFunction);
+ }
+ @Override
+ public void process(Byte key, final W window, final InternalWindowContext context, Iterable<T> input, Collector<R> out) throws Exception {
final ACC acc = aggFunction.createAccumulator();
for (T val : input) {
aggFunction.add(val, acc);
}
- wrappedFunction.process(context, Collections.singletonList(aggFunction.getResult(acc)), out);
+ this.ctx.window = window;
+ this.ctx.internalContext = context;
+ ProcessAllWindowFunction<V, R, W> wrappedFunction = this.wrappedFunction;
+ wrappedFunction.process(ctx, Collections.singletonList(aggFunction.getResult(acc)), out);
+ }
+
+ @Override
+ public void clear(final W window, final InternalWindowContext context) throws Exception {
+ this.ctx.window = window;
+ this.ctx.internalContext = context;
+ wrappedFunction.clear(ctx);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessWindowFunction.java
index 433da9b..e14c9bd 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalAggregateProcessWindowFunction.java
@@ -46,30 +46,36 @@ public final class InternalAggregateProcessWindowFunction<T, ACC, V, R, K, W ext
private final AggregateFunction<T, ACC, V> aggFunction;
+ private final InternalProcessWindowContext<V, R, K, W> ctx;
+
public InternalAggregateProcessWindowFunction(
AggregateFunction<T, ACC, V> aggFunction,
ProcessWindowFunction<V, R, K, W> windowFunction) {
super(windowFunction);
this.aggFunction = aggFunction;
+ this.ctx = new InternalProcessWindowContext<>(windowFunction);
}
-
- @Override
- public void apply(K key, final W window, Iterable<T> input, Collector<R> out) throws Exception {
- ProcessWindowFunction<V, R, K, W> wrappedFunction = this.wrappedFunction;
- ProcessWindowFunction<V, R, K, W>.Context context = wrappedFunction.new Context() {
- @Override
- public W window() {
- return window;
- }
- };
+ @Override
+ public void process(K key, final W window, final InternalWindowContext context, Iterable<T> input, Collector<R> out) throws Exception {
final ACC acc = aggFunction.createAccumulator();
for (T val : input) {
aggFunction.add(val, acc);
}
- wrappedFunction.process(key, context, Collections.singletonList(aggFunction.getResult(acc)), out);
+ this.ctx.window = window;
+ this.ctx.internalContext = context;
+ ProcessWindowFunction<V, R, K, W> wrappedFunction = this.wrappedFunction;
+ wrappedFunction.process(key, ctx, Collections.singletonList(aggFunction.getResult(acc)), out);
+ }
+
+ @Override
+ public void clear(final W window, final InternalWindowContext context) throws Exception {
+ this.ctx.window = window;
+ this.ctx.internalContext = context;
+ ProcessWindowFunction<V, R, K, W> wrappedFunction = this.wrappedFunction;
+ wrappedFunction.clear(ctx);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableAllWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableAllWindowFunction.java
index 672bdb6..f2507ed 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableAllWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableAllWindowFunction.java
@@ -39,11 +39,16 @@ public final class InternalIterableAllWindowFunction<IN, OUT, W extends Window>
}
@Override
- public void apply(Byte key, W window, Iterable<IN> input, Collector<OUT> out) throws Exception {
+ public void process(Byte key, W window, InternalWindowContext context, Iterable<IN> input, Collector<OUT> out) throws Exception {
wrappedFunction.apply(window, input, out);
}
@Override
+ public void clear(W window, InternalWindowContext context) throws Exception {
+
+ }
+
+ @Override
public RuntimeContext getRuntimeContext() {
throw new RuntimeException("This should never be called.");
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableProcessAllWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableProcessAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableProcessAllWindowFunction.java
index e33cc2a..47b7d55 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableProcessAllWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableProcessAllWindowFunction.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.operators.windowing.functions;
import org.apache.flink.api.common.functions.IterationRuntimeContext;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.java.operators.translation.WrappingFunction;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.util.Collector;
@@ -34,21 +35,33 @@ public final class InternalIterableProcessAllWindowFunction<IN, OUT, W extends W
private static final long serialVersionUID = 1L;
+ private transient InternalProcessAllWindowContext<IN, OUT, W> ctx;
+
public InternalIterableProcessAllWindowFunction(ProcessAllWindowFunction<IN, OUT, W> wrappedFunction) {
super(wrappedFunction);
}
@Override
- public void apply(Byte key, final W window, Iterable<IN> input, Collector<OUT> out) throws Exception {
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ ProcessAllWindowFunction<IN, OUT, W> wrappedFunction = this.wrappedFunction;
+ this.ctx = new InternalProcessAllWindowContext<>(wrappedFunction);
+ }
+
+ @Override
+ public void process(Byte key, final W window, final InternalWindowContext context, Iterable<IN> input, Collector<OUT> out) throws Exception {
+ this.ctx.window = window;
+ this.ctx.internalContext = context;
+ ProcessAllWindowFunction<IN, OUT, W> wrappedFunction = this.wrappedFunction;
+ wrappedFunction.process(ctx, input, out);
+ }
+
+ @Override
+ public void clear(final W window, final InternalWindowContext context) throws Exception {
+ this.ctx.window = window;
+ this.ctx.internalContext = context;
ProcessAllWindowFunction<IN, OUT, W> wrappedFunction = this.wrappedFunction;
- ProcessAllWindowFunction<IN, OUT, W>.Context context = wrappedFunction.new Context() {
- @Override
- public W window() {
- return window;
- }
- };
-
- wrappedFunction.process(context, input, out);
+ wrappedFunction.clear(ctx);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableProcessWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableProcessWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableProcessWindowFunction.java
index de516a5..7eb015e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableProcessWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableProcessWindowFunction.java
@@ -34,21 +34,27 @@ public final class InternalIterableProcessWindowFunction<IN, OUT, KEY, W extends
private static final long serialVersionUID = 1L;
+ private final InternalProcessWindowContext<IN, OUT, KEY, W> ctx;
+
public InternalIterableProcessWindowFunction(ProcessWindowFunction<IN, OUT, KEY, W> wrappedFunction) {
super(wrappedFunction);
+ this.ctx = new InternalProcessWindowContext<>(wrappedFunction);
+ }
+
+ @Override
+ public void process(KEY key, final W window, final InternalWindowContext context, Iterable<IN> input, Collector<OUT> out) throws Exception {
+ this.ctx.window = window;
+ this.ctx.internalContext = context;
+ ProcessWindowFunction<IN, OUT, KEY, W> wrappedFunction = this.wrappedFunction;
+ wrappedFunction.process(key, ctx, input, out);
}
@Override
- public void apply(KEY key, final W window, Iterable<IN> input, Collector<OUT> out) throws Exception {
+ public void clear(final W window, final InternalWindowContext context) throws Exception {
+ this.ctx.window = window;
+ this.ctx.internalContext = context;
ProcessWindowFunction<IN, OUT, KEY, W> wrappedFunction = this.wrappedFunction;
- ProcessWindowFunction<IN, OUT, KEY, W>.Context context = wrappedFunction.new Context() {
- @Override
- public W window() {
- return window;
- }
- };
-
- wrappedFunction.process(key, context, input, out);
+ wrappedFunction.clear(ctx);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableWindowFunction.java
index 895b31f..e2f1517 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableWindowFunction.java
@@ -39,11 +39,16 @@ public final class InternalIterableWindowFunction<IN, OUT, KEY, W extends Window
}
@Override
- public void apply(KEY key, W window, Iterable<IN> input, Collector<OUT> out) throws Exception {
+ public void process(KEY key, W window, InternalWindowContext context, Iterable<IN> input, Collector<OUT> out) throws Exception {
wrappedFunction.apply(key, window, input, out);
}
@Override
+ public void clear(W window, InternalWindowContext context) throws Exception {
+
+ }
+
+ @Override
public RuntimeContext getRuntimeContext() {
throw new RuntimeException("This should never be called.");
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalProcessAllWindowContext.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalProcessAllWindowContext.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalProcessAllWindowContext.java
new file mode 100644
index 0000000..c70e161
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalProcessAllWindowContext.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing.functions;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.state.KeyedStateStore;
+import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+
+/**
+ * Internal reusable context wrapper.
+ *
+ * @param <IN> The type of the input value.
+ * @param <OUT> The type of the output value.
+ * @param <W> The type of the window.
+ */
+@Internal
+public class InternalProcessAllWindowContext<IN, OUT, W extends Window>
+ extends ProcessAllWindowFunction<IN, OUT, W>.Context {
+
+ W window;
+ InternalWindowFunction.InternalWindowContext internalContext;
+
+ InternalProcessAllWindowContext(ProcessAllWindowFunction<IN, OUT, W> function) {
+ function.super();
+ }
+
+ @Override
+ public W window() {
+ return window;
+ }
+
+ @Override
+ public KeyedStateStore windowState() {
+ return internalContext.windowState();
+ }
+
+ @Override
+ public KeyedStateStore globalState() {
+ return internalContext.globalState();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalProcessWindowContext.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalProcessWindowContext.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalProcessWindowContext.java
new file mode 100644
index 0000000..0f1c0ee
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalProcessWindowContext.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing.functions;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.state.KeyedStateStore;
+import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+
+/**
+ * Internal reusable context wrapper.
+ *
+ * @param <IN> The type of the input value.
+ * @param <OUT> The type of the output value.
+ * @param <KEY> The type of the key.
+ * @param <W> The type of the window.
+ */
+@Internal
+public class InternalProcessWindowContext<IN, OUT, KEY, W extends Window>
+ extends ProcessWindowFunction<IN, OUT, KEY, W>.Context {
+
+ W window;
+ InternalWindowFunction.InternalWindowContext internalContext;
+
+ InternalProcessWindowContext(ProcessWindowFunction<IN, OUT, KEY, W> function) {
+ function.super();
+ }
+
+ @Override
+ public W window() {
+ return window;
+ }
+
+ @Override
+ public KeyedStateStore windowState() {
+ return internalContext.windowState();
+ }
+
+ @Override
+ public KeyedStateStore globalState() {
+ return internalContext.globalState();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueAllWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueAllWindowFunction.java
index a34d3ec..e90bcf4 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueAllWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueAllWindowFunction.java
@@ -41,11 +41,16 @@ public final class InternalSingleValueAllWindowFunction<IN, OUT, W extends Windo
}
@Override
- public void apply(Byte key, W window, IN input, Collector<OUT> out) throws Exception {
+ public void process(Byte key, W window, InternalWindowContext context, IN input, Collector<OUT> out) throws Exception {
wrappedFunction.apply(window, Collections.singletonList(input), out);
}
@Override
+ public void clear(W window, InternalWindowContext context) throws Exception {
+
+ }
+
+ @Override
public RuntimeContext getRuntimeContext() {
throw new RuntimeException("This should never be called.");
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueProcessAllWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueProcessAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueProcessAllWindowFunction.java
index 0284ef7..f7c6a08 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueProcessAllWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueProcessAllWindowFunction.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.operators.windowing.functions;
import org.apache.flink.api.common.functions.IterationRuntimeContext;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.java.operators.translation.WrappingFunction;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.util.Collector;
@@ -36,21 +37,33 @@ public final class InternalSingleValueProcessAllWindowFunction<IN, OUT, W extend
private static final long serialVersionUID = 1L;
+ private transient InternalProcessAllWindowContext<IN, OUT, W> ctx;
+
public InternalSingleValueProcessAllWindowFunction(ProcessAllWindowFunction<IN, OUT, W> wrappedFunction) {
super(wrappedFunction);
}
@Override
- public void apply(Byte key, final W window, IN input, Collector<OUT> out) throws Exception {
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ ProcessAllWindowFunction<IN, OUT, W> wrappedFunction = this.wrappedFunction;
+ this.ctx = new InternalProcessAllWindowContext<>(wrappedFunction);
+ }
+
+ @Override
+ public void process(Byte key, final W window, final InternalWindowContext context, IN input, Collector<OUT> out) throws Exception {
+ this.ctx.window = window;
+ this.ctx.internalContext = context;
ProcessAllWindowFunction<IN, OUT, W> wrappedFunction = this.wrappedFunction;
- ProcessAllWindowFunction<IN, OUT, W>.Context context = wrappedFunction.new Context() {
- @Override
- public W window() {
- return window;
- }
- };
+ wrappedFunction.process(ctx, Collections.singletonList(input), out);
+ }
- wrappedFunction.process(context, Collections.singletonList(input), out);
+ @Override
+ public void clear(final W window, final InternalWindowContext context) throws Exception {
+ this.ctx.window = window;
+ this.ctx.internalContext = context;
+ ProcessAllWindowFunction<IN, OUT, W> wrappedFunction = this.wrappedFunction;
+ wrappedFunction.clear(ctx);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueProcessWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueProcessWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueProcessWindowFunction.java
index 7a4e8c6..21d1639 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueProcessWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueProcessWindowFunction.java
@@ -36,21 +36,29 @@ public final class InternalSingleValueProcessWindowFunction<IN, OUT, KEY, W exte
private static final long serialVersionUID = 1L;
+ private final InternalProcessWindowContext<IN, OUT, KEY, W> ctx;
+
public InternalSingleValueProcessWindowFunction(ProcessWindowFunction<IN, OUT, KEY, W> wrappedFunction) {
super(wrappedFunction);
+ ctx = new InternalProcessWindowContext<>(wrappedFunction);
}
@Override
- public void apply(KEY key, final W window, IN input, Collector<OUT> out) throws Exception {
+ public void process(KEY key, final W window, final InternalWindowContext context, IN input, Collector<OUT> out) throws Exception {
+ this.ctx.window = window;
+ this.ctx.internalContext = context;
+
ProcessWindowFunction<IN, OUT, KEY, W> wrappedFunction = this.wrappedFunction;
- ProcessWindowFunction<IN, OUT, KEY, W>.Context context = wrappedFunction.new Context() {
- @Override
- public W window() {
- return window;
- }
- };
+ wrappedFunction.process(key, ctx, Collections.singletonList(input), out);
+ }
- wrappedFunction.process(key, context, Collections.singletonList(input), out);
+ @Override
+ public void clear(final W window, final InternalWindowContext context) throws Exception {
+ this.ctx.window = window;
+ this.ctx.internalContext = context;
+
+ ProcessWindowFunction<IN, OUT, KEY, W> wrappedFunction = this.wrappedFunction;
+ wrappedFunction.clear(ctx);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueWindowFunction.java
index 9a0a447..d5cc4a2 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalSingleValueWindowFunction.java
@@ -41,11 +41,16 @@ public final class InternalSingleValueWindowFunction<IN, OUT, KEY, W extends Win
}
@Override
- public void apply(KEY key, W window, IN input, Collector<OUT> out) throws Exception {
+ public void process(KEY key, W window, InternalWindowContext context, IN input, Collector<OUT> out) throws Exception {
wrappedFunction.apply(key, window, Collections.singletonList(input), out);
}
@Override
+ public void clear(W window, InternalWindowContext context) throws Exception {
+
+ }
+
+ @Override
public RuntimeContext getRuntimeContext() {
throw new RuntimeException("This should never be called.");
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalWindowFunction.java
index 2eb4052..9834480 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalWindowFunction.java
@@ -18,6 +18,7 @@
package org.apache.flink.streaming.runtime.operators.windowing.functions;
import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.state.KeyedStateStore;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.util.Collector;
@@ -29,15 +30,28 @@ import org.apache.flink.util.Collector;
* @param <KEY> The type of the key.
*/
public interface InternalWindowFunction<IN, OUT, KEY, W extends Window> extends Function {
-
/**
* Evaluates the window and outputs none or several elements.
*
- * @param key The key for which this window is evaluated.
- * @param window The window that is being evaluated.
- * @param input The elements in the window being evaluated.
- * @param out A collector for emitting elements.
+ * @param context The context in which the window is being evaluated.
+ * @param input The elements in the window being evaluated.
+ * @param out A collector for emitting elements.
+ *
* @throws Exception The function may throw exceptions to fail the program and trigger recovery.
*/
- void apply(KEY key, W window, IN input, Collector<OUT> out) throws Exception;
+ void process(KEY key, W window, InternalWindowContext context, IN input, Collector<OUT> out) throws Exception;
+
+ /**
+ * Deletes any state in the {@code Context} when the Window is purged.
+ *
+ * @param context The context to which the window is being evaluated
+ * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
+ */
+ void clear(W window, InternalWindowContext context) throws Exception;
+
+ interface InternalWindowContext extends java.io.Serializable {
+ KeyedStateStore windowState();
+
+ KeyedStateStore globalState();
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fad201bf/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyProcessWindowFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyProcessWindowFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyProcessWindowFunctionTest.java
index 4b479f3..c4bed37 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyProcessWindowFunctionTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyProcessWindowFunctionTest.java
@@ -21,20 +21,28 @@ package org.apache.flink.streaming.api.operators;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.util.ListCollector;
+import org.apache.flink.api.common.state.FoldingState;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.KeyedStateStore;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingState;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeutils.base.ByteSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.api.java.Utils;
import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.FoldApplyProcessAllWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.FoldApplyProcessWindowFunction;
-import org.apache.flink.streaming.api.functions.windowing.FoldApplyWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
-import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
@@ -45,8 +53,8 @@ import org.apache.flink.streaming.runtime.operators.windowing.AccumulatingProces
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessAllWindowFunction;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction;
import org.apache.flink.util.Collector;
-import org.junit.Test;
import org.junit.Assert;
+import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
@@ -139,12 +147,26 @@ public class FoldApplyProcessWindowFunctionTest {
expected.add(initValue);
- foldWindowFunction.process(0, foldWindowFunction.new Context() {
+ FoldApplyProcessWindowFunction<Integer, TimeWindow, Integer, Integer, Integer>.Context ctx = foldWindowFunction.new Context() {
@Override
public TimeWindow window() {
return new TimeWindow(0, 1);
}
- }, input, new ListCollector<>(result));
+
+ @Override
+ public KeyedStateStore windowState() {
+ return new DummyKeyedStateStore();
+ }
+
+ @Override
+ public KeyedStateStore globalState() {
+ return new DummyKeyedStateStore();
+ }
+ };
+
+ foldWindowFunction.open(new Configuration());
+
+ foldWindowFunction.process(0, ctx, input, new ListCollector<>(result));
Assert.assertEquals(expected, result);
}
@@ -234,16 +256,58 @@ public class FoldApplyProcessWindowFunctionTest {
expected.add(initValue);
- foldWindowFunction.process(foldWindowFunction.new Context() {
+ FoldApplyProcessAllWindowFunction<TimeWindow, Integer, Integer, Integer>.Context ctx = foldWindowFunction.new Context() {
@Override
public TimeWindow window() {
return new TimeWindow(0, 1);
}
- }, input, new ListCollector<>(result));
+
+ @Override
+ public KeyedStateStore windowState() {
+ return new DummyKeyedStateStore();
+ }
+
+ @Override
+ public KeyedStateStore globalState() {
+ return new DummyKeyedStateStore();
+ }
+ };
+
+ foldWindowFunction.open(new Configuration());
+
+ foldWindowFunction.process(ctx, input, new ListCollector<>(result));
Assert.assertEquals(expected, result);
}
+ public static class DummyKeyedStateStore implements KeyedStateStore {
+
+ @Override
+ public <T> ValueState<T> getState(ValueStateDescriptor<T> stateProperties) {
+ return null;
+ }
+
+ @Override
+ public <T> ListState<T> getListState(ListStateDescriptor<T> stateProperties) {
+ return null;
+ }
+
+ @Override
+ public <T> ReducingState<T> getReducingState(ReducingStateDescriptor<T> stateProperties) {
+ return null;
+ }
+
+ @Override
+ public <T, ACC> FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC> stateProperties) {
+ return null;
+ }
+
+ @Override
+ public <UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> stateProperties) {
+ return null;
+ }
+ }
+
public static class DummyStreamExecutionEnvironment extends StreamExecutionEnvironment {
public DummyStreamExecutionEnvironment() {