You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/12/14 12:10:16 UTC

[09/12] flink git commit: [FLINK-5289] [streaming] Give meaningful exceptions when using value state on non-keyed stream

[FLINK-5289] [streaming] Give meaningful exceptions when using value state on non-keyed stream

This closes #2969


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c906ad90
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c906ad90
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c906ad90

Branch: refs/heads/master
Commit: c906ad90ff6410c182995b77b7fd2eed32754989
Parents: 2f3ad58
Author: Stefan Richter <s....@data-artisans.com>
Authored: Thu Dec 8 15:07:58 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Dec 14 12:43:33 2016 +0100

----------------------------------------------------------------------
 .../api/operators/StreamingRuntimeContext.java  | 32 +++++++++++++++-----
 1 file changed, 24 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c906ad90/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
index fc9e39e..b450923 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
@@ -22,10 +22,12 @@ import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
 import org.apache.flink.api.common.functions.util.AbstractRuntimeUDFContext;
+import org.apache.flink.api.common.state.KeyedStateStore;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.ReducingState;
 import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.runtime.execution.Environment;
@@ -33,6 +35,7 @@ import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.Preconditions;
 
 import java.util.List;
 import java.util.Map;
@@ -46,12 +49,12 @@ public class StreamingRuntimeContext extends AbstractRuntimeUDFContext {
 
 	/** The operator to which this function belongs */
 	private final AbstractStreamOperator<?> operator;
-	
+
 	/** The task environment running the operator */
 	private final Environment taskEnvironment;
 
 	private final StreamConfig streamConfig;
-	
+
 	public StreamingRuntimeContext(AbstractStreamOperator<?> operator,
 									Environment env, Map<String, Accumulator<?, ?>> accumulators) {
 		super(env.getTaskInfo(),
@@ -60,17 +63,17 @@ public class StreamingRuntimeContext extends AbstractRuntimeUDFContext {
 				accumulators,
 				env.getDistributedCacheEntries(),
 				operator.getMetricGroup());
-		
+
 		this.operator = operator;
 		this.taskEnvironment = env;
 		this.streamConfig = new StreamConfig(env.getTaskConfiguration());
 	}
 
 	// ------------------------------------------------------------------------
-	
+
 	/**
 	 * Returns the input split provider associated with the operator.
-	 * 
+	 *
 	 * @return The input split provider.
 	 */
 	public InputSplitProvider getInputSplitProvider() {
@@ -106,17 +109,30 @@ public class StreamingRuntimeContext extends AbstractRuntimeUDFContext {
 
 	@Override
 	public <T> ValueState<T> getState(ValueStateDescriptor<T> stateProperties) {
-		return operator.getKeyedStateStore().getState(stateProperties);
+		KeyedStateStore keyedStateStore = checkPreconditionsAndGetKeyedStateStore(stateProperties);
+		stateProperties.initializeSerializerUnlessSet(getExecutionConfig());
+		return keyedStateStore.getState(stateProperties);
 	}
 
 	@Override
 	public <T> ListState<T> getListState(ListStateDescriptor<T> stateProperties) {
-		return operator.getKeyedStateStore().getListState(stateProperties);
+		KeyedStateStore keyedStateStore = checkPreconditionsAndGetKeyedStateStore(stateProperties);
+		stateProperties.initializeSerializerUnlessSet(getExecutionConfig());
+		return keyedStateStore.getListState(stateProperties);
 	}
 
 	@Override
 	public <T> ReducingState<T> getReducingState(ReducingStateDescriptor<T> stateProperties) {
-		return operator.getKeyedStateStore().getReducingState(stateProperties);
+		KeyedStateStore keyedStateStore = checkPreconditionsAndGetKeyedStateStore(stateProperties);
+		stateProperties.initializeSerializerUnlessSet(getExecutionConfig());
+		return keyedStateStore.getReducingState(stateProperties);
+	}
+
+	private KeyedStateStore checkPreconditionsAndGetKeyedStateStore(StateDescriptor<?, ?> stateDescriptor) {
+		Preconditions.checkNotNull(stateDescriptor, "The state properties must not be null");
+		KeyedStateStore keyedStateStore = operator.getKeyedStateStore();
+		Preconditions.checkNotNull(keyedStateStore, "Keyed state can only be used on a 'keyed stream', i.e., after a 'keyBy()' operation.");
+		return keyedStateStore;
 	}
 
 	// ------------------ expose (read only) relevant information from the stream config -------- //