You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2018/05/16 12:49:27 UTC

flink git commit: [FLINK-9174] Make type of state created in ProcessWindowFunction.process() consistent

Repository: flink
Updated Branches:
  refs/heads/master 0b5e124ba -> 63740e7c5


[FLINK-9174] Make type of state created in ProcessWindowFunction.process() consistent


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/63740e7c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/63740e7c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/63740e7c

Branch: refs/heads/master
Commit: 63740e7c51800599d6d4d77bb7762600df2bad88
Parents: 0b5e124
Author: sihuazhou <su...@163.com>
Authored: Sat Apr 14 18:02:24 2018 +0800
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Wed May 16 14:47:40 2018 +0200

----------------------------------------------------------------------
 .../runtime/state/DefaultKeyedStateStore.java   |   6 +-
 .../operators/windowing/WindowOperator.java     |  75 ++++---------
 .../windowing/WindowOperatorContractTest.java   | 110 +++++++++++++++++++
 3 files changed, 137 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/63740e7c/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultKeyedStateStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultKeyedStateStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultKeyedStateStore.java
index e0bb7b7..b970081 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultKeyedStateStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultKeyedStateStore.java
@@ -44,8 +44,8 @@ import static java.util.Objects.requireNonNull;
  */
 public class DefaultKeyedStateStore implements KeyedStateStore {
 
-	private final KeyedStateBackend<?> keyedStateBackend;
-	private final ExecutionConfig executionConfig;
+	protected final KeyedStateBackend<?> keyedStateBackend;
+	protected final ExecutionConfig executionConfig;
 
 	public DefaultKeyedStateStore(KeyedStateBackend<?> keyedStateBackend, ExecutionConfig executionConfig) {
 		this.keyedStateBackend = Preconditions.checkNotNull(keyedStateBackend);
@@ -120,7 +120,7 @@ public class DefaultKeyedStateStore implements KeyedStateStore {
 		}
 	}
 
-	private <S extends State> S getPartitionedState(StateDescriptor<S, ?> stateDescriptor) throws Exception {
+	protected  <S extends State> S getPartitionedState(StateDescriptor<S, ?> stateDescriptor) throws Exception {
 		return keyedStateBackend.getPartitionedState(
 				VoidNamespace.INSTANCE,
 				VoidNamespaceSerializer.INSTANCE,

http://git-wip-us.apache.org/repos/asf/flink/blob/63740e7c/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index 9e3898c..ecce1fb 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.operators.windowing;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.state.AggregatingState;
 import org.apache.flink.api.common.state.AggregatingStateDescriptor;
 import org.apache.flink.api.common.state.AppendingState;
@@ -45,6 +46,8 @@ import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.state.DefaultKeyedStateStore;
+import org.apache.flink.runtime.state.KeyedStateBackend;
 import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
 import org.apache.flink.runtime.state.internal.InternalAppendingState;
@@ -163,7 +166,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 
 	protected transient Context triggerContext = new Context(null, null);
 
-	protected transient WindowContext processContext = new WindowContext(null);
+	protected transient WindowContext processContext;
 
 	protected transient WindowAssigner.WindowAssignerContext windowAssignerContext;
 
@@ -654,17 +657,26 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 	 * Base class for per-window {@link KeyedStateStore KeyedStateStores}. Used to allow per-window
 	 * state access for {@link org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction}.
 	 */
-	public abstract class AbstractPerWindowStateStore implements KeyedStateStore {
+	public abstract class AbstractPerWindowStateStore extends DefaultKeyedStateStore {
 
 		// we have this in the base class even though it's not used in MergingKeyStore so that
 		// we can always set it and ignore what actual implementation we have
 		protected W window;
+
+		public AbstractPerWindowStateStore(KeyedStateBackend<?> keyedStateBackend, ExecutionConfig executionConfig) {
+			super(keyedStateBackend, executionConfig);
+		}
 	}
 
 	/**
 	 * Special {@link AbstractPerWindowStateStore} that doesn't allow access to per-window state.
 	 */
 	public class MergingWindowStateStore extends AbstractPerWindowStateStore {
+
+		public MergingWindowStateStore(KeyedStateBackend<?> keyedStateBackend, ExecutionConfig executionConfig) {
+			super(keyedStateBackend, executionConfig);
+		}
+
 		@Override
 		public <T> ValueState<T> getState(ValueStateDescriptor<T> stateProperties) {
 			throw new UnsupportedOperationException("Per-window state is not allowed when using merging windows.");
@@ -701,58 +713,17 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 	 * {@link org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction}.
 	 */
 	public class PerWindowStateStore extends AbstractPerWindowStateStore {
-		@Override
-		public <T> ValueState<T> getState(ValueStateDescriptor<T> stateProperties) {
-			try {
-				return WindowOperator.this.getPartitionedState(window, windowSerializer, stateProperties);
-			} catch (Exception e) {
-				throw new RuntimeException("Could not retrieve state", e);
-			}
-		}
-
-		@Override
-		public <T> ListState<T> getListState(ListStateDescriptor<T> stateProperties) {
-			try {
-				return WindowOperator.this.getPartitionedState(window, windowSerializer, stateProperties);
-			} catch (Exception e) {
-				throw new RuntimeException("Could not retrieve state", e);
-			}
-		}
-
-		@Override
-		public <T> ReducingState<T> getReducingState(ReducingStateDescriptor<T> stateProperties) {
-			try {
-				return WindowOperator.this.getPartitionedState(window, windowSerializer, stateProperties);
-			} catch (Exception e) {
-				throw new RuntimeException("Could not retrieve state", e);
-			}
-		}
 
-		@Override
-		public <IN, ACC, OUT> AggregatingState<IN, OUT> getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT> stateProperties) {
-			try {
-				return WindowOperator.this.getPartitionedState(window, windowSerializer, stateProperties);
-			} catch (Exception e) {
-				throw new RuntimeException("Could not retrieve state", e);
-			}
+		public PerWindowStateStore(KeyedStateBackend<?> keyedStateBackend, ExecutionConfig executionConfig) {
+			super(keyedStateBackend, executionConfig);
 		}
 
 		@Override
-		public <T, ACC> FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC> stateProperties) {
-			try {
-				return WindowOperator.this.getPartitionedState(window, windowSerializer, stateProperties);
-			} catch (Exception e) {
-				throw new RuntimeException("Could not retrieve state", e);
-			}
-		}
-
-		@Override
-		public <UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> stateProperties) {
-			try {
-				return WindowOperator.this.getPartitionedState(window, windowSerializer, stateProperties);
-			} catch (Exception e) {
-				throw new RuntimeException("Could not retrieve state", e);
-			}
+		protected  <S extends State> S getPartitionedState(StateDescriptor<S, ?> stateDescriptor) throws Exception {
+			return keyedStateBackend.getPartitionedState(
+				window,
+				windowSerializer,
+				stateDescriptor);
 		}
 	}
 
@@ -768,7 +739,9 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 
 		public WindowContext(W window) {
 			this.window = window;
-			this.windowState = windowAssigner instanceof MergingWindowAssigner ?  new MergingWindowStateStore() : new PerWindowStateStore();
+			this.windowState = windowAssigner instanceof MergingWindowAssigner ?
+				new MergingWindowStateStore(getKeyedStateBackend(), getExecutionConfig()) :
+				new PerWindowStateStore(getKeyedStateBackend(), getExecutionConfig());
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/63740e7c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
index 30382ed..c8368ac 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
@@ -19,6 +19,13 @@
 package org.apache.flink.streaming.runtime.operators.windowing;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.AggregateFunction;
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.KeyedStateStore;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
@@ -53,6 +60,7 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.apache.flink.streaming.runtime.operators.windowing.StreamRecordMatchers.isStreamRecord;
 import static org.hamcrest.Matchers.contains;
@@ -2508,6 +2516,108 @@ public abstract class WindowOperatorContractTest extends TestLogger {
 		testCurrentTimeQuerying(new ProcessingTimeAdaptor());
 	}
 
+	@Test
+	public void testStateTypeIsConsistentFromWindowStateAndGlobalState() throws Exception {
+
+		class NoOpAggregateFunction implements AggregateFunction<String, String, String> {
+
+			@Override
+			public String createAccumulator() {
+				return null;
+			}
+
+			@Override
+			public String add(String value, String accumulator) {
+				return null;
+			}
+
+			@Override
+			public String getResult(String accumulator) {
+				return null;
+			}
+
+			@Override
+			public String merge(String a, String b) {
+				return null;
+			}
+		}
+
+		WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner();
+		Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
+		InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction();
+
+		KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness =
+			createWindowOperator(mockAssigner, mockTrigger, 20L, mockWindowFunction);
+
+		testHarness.open();
+
+		when(mockTrigger.onElement(anyInt(), anyLong(), anyTimeWindow(), anyTriggerContext()))
+			.thenReturn(TriggerResult.FIRE);
+
+		when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext()))
+			.thenReturn(Arrays.asList(new TimeWindow(0, 20)));
+
+		AtomicBoolean processWasInvoked = new AtomicBoolean(false);
+
+		doAnswer(new Answer<Object>() {
+			@Override
+			public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+				InternalWindowFunction.InternalWindowContext context = (InternalWindowFunction.InternalWindowContext) invocationOnMock.getArguments()[2];
+				KeyedStateStore windowKeyedStateStore = context.windowState();
+				KeyedStateStore globalKeyedStateStore = context.globalState();
+
+				ListStateDescriptor<String> windowListStateDescriptor = new ListStateDescriptor<String>("windowListState", String.class);
+				ListStateDescriptor<String> globalListStateDescriptor = new ListStateDescriptor<String>("globalListState", String.class);
+				assertEquals(
+					windowKeyedStateStore.getListState(windowListStateDescriptor).getClass(),
+					globalKeyedStateStore.getListState(globalListStateDescriptor).getClass());
+
+				ValueStateDescriptor<String> windowValueStateDescriptor = new ValueStateDescriptor<String>("windowValueState", String.class);
+				ValueStateDescriptor<String> globalValueStateDescriptor = new ValueStateDescriptor<String>("globalValueState", String.class);
+				assertEquals(
+					windowKeyedStateStore.getState(windowValueStateDescriptor).getClass(),
+					globalKeyedStateStore.getState(globalValueStateDescriptor).getClass());
+
+				AggregatingStateDescriptor<String, String, String> windowAggStateDesc = new AggregatingStateDescriptor<String, String, String>(
+					"windowAgg",
+					new NoOpAggregateFunction(),
+					String.class);
+
+				AggregatingStateDescriptor<String, String, String> globalAggStateDesc = new AggregatingStateDescriptor<String, String, String>(
+					"globalAgg",
+					new NoOpAggregateFunction(),
+					String.class);
+				assertEquals(
+					windowKeyedStateStore.getAggregatingState(windowAggStateDesc).getClass(),
+					globalKeyedStateStore.getAggregatingState(globalAggStateDesc).getClass());
+
+				ReducingStateDescriptor<String> windowReducingStateDesc = new ReducingStateDescriptor<String>("windowReducing", (a, b) -> a, String.class);
+				ReducingStateDescriptor<String> globalReducingStateDesc = new ReducingStateDescriptor<String>("globalReducing", (a, b) -> a, String.class);
+				assertEquals(
+					windowKeyedStateStore.getReducingState(windowReducingStateDesc).getClass(),
+					globalKeyedStateStore.getReducingState(globalReducingStateDesc).getClass());
+
+				FoldingStateDescriptor<String, String> windowFoldingStateDescriptor = new FoldingStateDescriptor<String, String>("windowFolding", "", (a, b) -> a, String.class);
+				FoldingStateDescriptor<String, String> globalFoldingStateDescriptor = new FoldingStateDescriptor<String, String>("globalFolding", "", (a, b) -> a, String.class);
+				assertEquals(
+					windowKeyedStateStore.getFoldingState(windowFoldingStateDescriptor).getClass(),
+					globalKeyedStateStore.getFoldingState(globalFoldingStateDescriptor).getClass());
+
+				MapStateDescriptor<String, String> windowMapStateDescriptor = new MapStateDescriptor<String, String>("windowMapState", String.class, String.class);
+				MapStateDescriptor<String, String> globalMapStateDescriptor = new MapStateDescriptor<String, String>("globalMapState", String.class, String.class);
+				assertEquals(windowKeyedStateStore.getMapState(windowMapStateDescriptor).getClass(),
+					globalKeyedStateStore.getMapState(globalMapStateDescriptor).getClass());
+
+				processWasInvoked.set(true);
+				return null;
+			}
+		}).when(mockWindowFunction).process(anyInt(), anyTimeWindow(), anyInternalWindowContext(), anyIntIterable(), WindowOperatorContractTest.<Void>anyCollector());
+
+		testHarness.processElement(new StreamRecord<>(0, 0L));
+
+		assertTrue(processWasInvoked.get());
+	}
+
 	public void testCurrentTimeQuerying(final TimeDomainAdaptor timeAdaptor) throws Exception {
 		WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner();
 		timeAdaptor.setIsEventTime(mockAssigner);