You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2018/05/16 12:49:27 UTC
flink git commit: [FLINK-9174] Make type of state created in
ProcessWindowFunction.process() consistent
Repository: flink
Updated Branches:
refs/heads/master 0b5e124ba -> 63740e7c5
[FLINK-9174] Make type of state created in ProcessWindowFunction.process() consistent
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/63740e7c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/63740e7c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/63740e7c
Branch: refs/heads/master
Commit: 63740e7c51800599d6d4d77bb7762600df2bad88
Parents: 0b5e124
Author: sihuazhou <su...@163.com>
Authored: Sat Apr 14 18:02:24 2018 +0800
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Wed May 16 14:47:40 2018 +0200
----------------------------------------------------------------------
.../runtime/state/DefaultKeyedStateStore.java | 6 +-
.../operators/windowing/WindowOperator.java | 75 ++++---------
.../windowing/WindowOperatorContractTest.java | 110 +++++++++++++++++++
3 files changed, 137 insertions(+), 54 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/63740e7c/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultKeyedStateStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultKeyedStateStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultKeyedStateStore.java
index e0bb7b7..b970081 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultKeyedStateStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultKeyedStateStore.java
@@ -44,8 +44,8 @@ import static java.util.Objects.requireNonNull;
*/
public class DefaultKeyedStateStore implements KeyedStateStore {
- private final KeyedStateBackend<?> keyedStateBackend;
- private final ExecutionConfig executionConfig;
+ protected final KeyedStateBackend<?> keyedStateBackend;
+ protected final ExecutionConfig executionConfig;
public DefaultKeyedStateStore(KeyedStateBackend<?> keyedStateBackend, ExecutionConfig executionConfig) {
this.keyedStateBackend = Preconditions.checkNotNull(keyedStateBackend);
@@ -120,7 +120,7 @@ public class DefaultKeyedStateStore implements KeyedStateStore {
}
}
- private <S extends State> S getPartitionedState(StateDescriptor<S, ?> stateDescriptor) throws Exception {
+ protected <S extends State> S getPartitionedState(StateDescriptor<S, ?> stateDescriptor) throws Exception {
return keyedStateBackend.getPartitionedState(
VoidNamespace.INSTANCE,
VoidNamespaceSerializer.INSTANCE,
http://git-wip-us.apache.org/repos/asf/flink/blob/63740e7c/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index 9e3898c..ecce1fb 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.operators.windowing;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.AggregatingState;
import org.apache.flink.api.common.state.AggregatingStateDescriptor;
import org.apache.flink.api.common.state.AppendingState;
@@ -45,6 +46,8 @@ import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.state.DefaultKeyedStateStore;
+import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.runtime.state.internal.InternalAppendingState;
@@ -163,7 +166,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
protected transient Context triggerContext = new Context(null, null);
- protected transient WindowContext processContext = new WindowContext(null);
+ protected transient WindowContext processContext;
protected transient WindowAssigner.WindowAssignerContext windowAssignerContext;
@@ -654,17 +657,26 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
* Base class for per-window {@link KeyedStateStore KeyedStateStores}. Used to allow per-window
* state access for {@link org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction}.
*/
- public abstract class AbstractPerWindowStateStore implements KeyedStateStore {
+ public abstract class AbstractPerWindowStateStore extends DefaultKeyedStateStore {
// we have this in the base class even though it's not used in MergingKeyStore so that
// we can always set it and ignore what actual implementation we have
protected W window;
+
+ public AbstractPerWindowStateStore(KeyedStateBackend<?> keyedStateBackend, ExecutionConfig executionConfig) {
+ super(keyedStateBackend, executionConfig);
+ }
}
/**
* Special {@link AbstractPerWindowStateStore} that doesn't allow access to per-window state.
*/
public class MergingWindowStateStore extends AbstractPerWindowStateStore {
+
+ public MergingWindowStateStore(KeyedStateBackend<?> keyedStateBackend, ExecutionConfig executionConfig) {
+ super(keyedStateBackend, executionConfig);
+ }
+
@Override
public <T> ValueState<T> getState(ValueStateDescriptor<T> stateProperties) {
throw new UnsupportedOperationException("Per-window state is not allowed when using merging windows.");
@@ -701,58 +713,17 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
* {@link org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction}.
*/
public class PerWindowStateStore extends AbstractPerWindowStateStore {
- @Override
- public <T> ValueState<T> getState(ValueStateDescriptor<T> stateProperties) {
- try {
- return WindowOperator.this.getPartitionedState(window, windowSerializer, stateProperties);
- } catch (Exception e) {
- throw new RuntimeException("Could not retrieve state", e);
- }
- }
-
- @Override
- public <T> ListState<T> getListState(ListStateDescriptor<T> stateProperties) {
- try {
- return WindowOperator.this.getPartitionedState(window, windowSerializer, stateProperties);
- } catch (Exception e) {
- throw new RuntimeException("Could not retrieve state", e);
- }
- }
-
- @Override
- public <T> ReducingState<T> getReducingState(ReducingStateDescriptor<T> stateProperties) {
- try {
- return WindowOperator.this.getPartitionedState(window, windowSerializer, stateProperties);
- } catch (Exception e) {
- throw new RuntimeException("Could not retrieve state", e);
- }
- }
- @Override
- public <IN, ACC, OUT> AggregatingState<IN, OUT> getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT> stateProperties) {
- try {
- return WindowOperator.this.getPartitionedState(window, windowSerializer, stateProperties);
- } catch (Exception e) {
- throw new RuntimeException("Could not retrieve state", e);
- }
+ public PerWindowStateStore(KeyedStateBackend<?> keyedStateBackend, ExecutionConfig executionConfig) {
+ super(keyedStateBackend, executionConfig);
}
@Override
- public <T, ACC> FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC> stateProperties) {
- try {
- return WindowOperator.this.getPartitionedState(window, windowSerializer, stateProperties);
- } catch (Exception e) {
- throw new RuntimeException("Could not retrieve state", e);
- }
- }
-
- @Override
- public <UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> stateProperties) {
- try {
- return WindowOperator.this.getPartitionedState(window, windowSerializer, stateProperties);
- } catch (Exception e) {
- throw new RuntimeException("Could not retrieve state", e);
- }
+ protected <S extends State> S getPartitionedState(StateDescriptor<S, ?> stateDescriptor) throws Exception {
+ return keyedStateBackend.getPartitionedState(
+ window,
+ windowSerializer,
+ stateDescriptor);
}
}
@@ -768,7 +739,9 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
public WindowContext(W window) {
this.window = window;
- this.windowState = windowAssigner instanceof MergingWindowAssigner ? new MergingWindowStateStore() : new PerWindowStateStore();
+ this.windowState = windowAssigner instanceof MergingWindowAssigner ?
+ new MergingWindowStateStore(getKeyedStateBackend(), getExecutionConfig()) :
+ new PerWindowStateStore(getKeyedStateBackend(), getExecutionConfig());
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/63740e7c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
index 30382ed..c8368ac 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
@@ -19,6 +19,13 @@
package org.apache.flink.streaming.runtime.operators.windowing;
import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.AggregateFunction;
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.KeyedStateStore;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
@@ -53,6 +60,7 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
import static org.apache.flink.streaming.runtime.operators.windowing.StreamRecordMatchers.isStreamRecord;
import static org.hamcrest.Matchers.contains;
@@ -2508,6 +2516,108 @@ public abstract class WindowOperatorContractTest extends TestLogger {
testCurrentTimeQuerying(new ProcessingTimeAdaptor());
}
+ @Test
+ public void testStateTypeIsConsistentFromWindowStateAndGlobalState() throws Exception {
+
+ class NoOpAggregateFunction implements AggregateFunction<String, String, String> {
+
+ @Override
+ public String createAccumulator() {
+ return null;
+ }
+
+ @Override
+ public String add(String value, String accumulator) {
+ return null;
+ }
+
+ @Override
+ public String getResult(String accumulator) {
+ return null;
+ }
+
+ @Override
+ public String merge(String a, String b) {
+ return null;
+ }
+ }
+
+ WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner();
+ Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
+ InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
+
+ KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness =
+ createWindowOperator(mockAssigner, mockTrigger, 20L, mockWindowFunction);
+
+ testHarness.open();
+
+ when(mockTrigger.onElement(anyInt(), anyLong(), anyTimeWindow(), anyTriggerContext()))
+ .thenReturn(TriggerResult.FIRE);
+
+ when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext()))
+ .thenReturn(Arrays.asList(new TimeWindow(0, 20)));
+
+ AtomicBoolean processWasInvoked = new AtomicBoolean(false);
+
+ doAnswer(new Answer<Object>() {
+ @Override
+ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+ InternalWindowFunction.InternalWindowContext context = (InternalWindowFunction.InternalWindowContext) invocationOnMock.getArguments()[2];
+ KeyedStateStore windowKeyedStateStore = context.windowState();
+ KeyedStateStore globalKeyedStateStore = context.globalState();
+
+ ListStateDescriptor<String> windowListStateDescriptor = new ListStateDescriptor<String>("windowListState", String.class);
+ ListStateDescriptor<String> globalListStateDescriptor = new ListStateDescriptor<String>("globalListState", String.class);
+ assertEquals(
+ windowKeyedStateStore.getListState(windowListStateDescriptor).getClass(),
+ globalKeyedStateStore.getListState(globalListStateDescriptor).getClass());
+
+ ValueStateDescriptor<String> windowValueStateDescriptor = new ValueStateDescriptor<String>("windowValueState", String.class);
+ ValueStateDescriptor<String> globalValueStateDescriptor = new ValueStateDescriptor<String>("globalValueState", String.class);
+ assertEquals(
+ windowKeyedStateStore.getState(windowValueStateDescriptor).getClass(),
+ globalKeyedStateStore.getState(globalValueStateDescriptor).getClass());
+
+ AggregatingStateDescriptor<String, String, String> windowAggStateDesc = new AggregatingStateDescriptor<String, String, String>(
+ "windowAgg",
+ new NoOpAggregateFunction(),
+ String.class);
+
+ AggregatingStateDescriptor<String, String, String> globalAggStateDesc = new AggregatingStateDescriptor<String, String, String>(
+ "globalAgg",
+ new NoOpAggregateFunction(),
+ String.class);
+ assertEquals(
+ windowKeyedStateStore.getAggregatingState(windowAggStateDesc).getClass(),
+ globalKeyedStateStore.getAggregatingState(globalAggStateDesc).getClass());
+
+ ReducingStateDescriptor<String> windowReducingStateDesc = new ReducingStateDescriptor<String>("windowReducing", (a, b) -> a, String.class);
+ ReducingStateDescriptor<String> globalReducingStateDesc = new ReducingStateDescriptor<String>("globalReducing", (a, b) -> a, String.class);
+ assertEquals(
+ windowKeyedStateStore.getReducingState(windowReducingStateDesc).getClass(),
+ globalKeyedStateStore.getReducingState(globalReducingStateDesc).getClass());
+
+ FoldingStateDescriptor<String, String> windowFoldingStateDescriptor = new FoldingStateDescriptor<String, String>("windowFolding", "", (a, b) -> a, String.class);
+ FoldingStateDescriptor<String, String> globalFoldingStateDescriptor = new FoldingStateDescriptor<String, String>("globalFolding", "", (a, b) -> a, String.class);
+ assertEquals(
+ windowKeyedStateStore.getFoldingState(windowFoldingStateDescriptor).getClass(),
+ globalKeyedStateStore.getFoldingState(globalFoldingStateDescriptor).getClass());
+
+ MapStateDescriptor<String, String> windowMapStateDescriptor = new MapStateDescriptor<String, String>("windowMapState", String.class, String.class);
+ MapStateDescriptor<String, String> globalMapStateDescriptor = new MapStateDescriptor<String, String>("globalMapState", String.class, String.class);
+ assertEquals(windowKeyedStateStore.getMapState(windowMapStateDescriptor).getClass(),
+ globalKeyedStateStore.getMapState(globalMapStateDescriptor).getClass());
+
+ processWasInvoked.set(true);
+ return null;
+ }
+ }).when(mockWindowFunction).process(anyInt(), anyTimeWindow(), anyInternalWindowContext(), anyIntIterable(), WindowOperatorContractTest.<Void>anyCollector());
+
+ testHarness.processElement(new StreamRecord<>(0, 0L));
+
+ assertTrue(processWasInvoked.get());
+ }
+
public void testCurrentTimeQuerying(final TimeDomainAdaptor timeAdaptor) throws Exception {
WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner();
timeAdaptor.setIsEventTime(mockAssigner);