You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/12/14 12:10:16 UTC
[09/12] flink git commit: [FLINK-5289] [streaming] Give meaningful
exceptions when using value state on non-keyed stream
[FLINK-5289] [streaming] Give meaningful exceptions when using value state on non-keyed stream
This closes #2969
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c906ad90
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c906ad90
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c906ad90
Branch: refs/heads/master
Commit: c906ad90ff6410c182995b77b7fd2eed32754989
Parents: 2f3ad58
Author: Stefan Richter <s....@data-artisans.com>
Authored: Thu Dec 8 15:07:58 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Dec 14 12:43:33 2016 +0100
----------------------------------------------------------------------
.../api/operators/StreamingRuntimeContext.java | 32 +++++++++++++++-----
1 file changed, 24 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/c906ad90/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
index fc9e39e..b450923 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
@@ -22,10 +22,12 @@ import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
import org.apache.flink.api.common.functions.util.AbstractRuntimeUDFContext;
+import org.apache.flink.api.common.state.KeyedStateStore;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.runtime.execution.Environment;
@@ -33,6 +35,7 @@ import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.Preconditions;
import java.util.List;
import java.util.Map;
@@ -46,12 +49,12 @@ public class StreamingRuntimeContext extends AbstractRuntimeUDFContext {
/** The operator to which this function belongs */
private final AbstractStreamOperator<?> operator;
-
+
/** The task environment running the operator */
private final Environment taskEnvironment;
private final StreamConfig streamConfig;
-
+
public StreamingRuntimeContext(AbstractStreamOperator<?> operator,
Environment env, Map<String, Accumulator<?, ?>> accumulators) {
super(env.getTaskInfo(),
@@ -60,17 +63,17 @@ public class StreamingRuntimeContext extends AbstractRuntimeUDFContext {
accumulators,
env.getDistributedCacheEntries(),
operator.getMetricGroup());
-
+
this.operator = operator;
this.taskEnvironment = env;
this.streamConfig = new StreamConfig(env.getTaskConfiguration());
}
// ------------------------------------------------------------------------
-
+
/**
* Returns the input split provider associated with the operator.
- *
+ *
* @return The input split provider.
*/
public InputSplitProvider getInputSplitProvider() {
@@ -106,17 +109,30 @@ public class StreamingRuntimeContext extends AbstractRuntimeUDFContext {
@Override
public <T> ValueState<T> getState(ValueStateDescriptor<T> stateProperties) {
- return operator.getKeyedStateStore().getState(stateProperties);
+ KeyedStateStore keyedStateStore = checkPreconditionsAndGetKeyedStateStore(stateProperties);
+ stateProperties.initializeSerializerUnlessSet(getExecutionConfig());
+ return keyedStateStore.getState(stateProperties);
}
@Override
public <T> ListState<T> getListState(ListStateDescriptor<T> stateProperties) {
- return operator.getKeyedStateStore().getListState(stateProperties);
+ KeyedStateStore keyedStateStore = checkPreconditionsAndGetKeyedStateStore(stateProperties);
+ stateProperties.initializeSerializerUnlessSet(getExecutionConfig());
+ return keyedStateStore.getListState(stateProperties);
}
@Override
public <T> ReducingState<T> getReducingState(ReducingStateDescriptor<T> stateProperties) {
- return operator.getKeyedStateStore().getReducingState(stateProperties);
+ KeyedStateStore keyedStateStore = checkPreconditionsAndGetKeyedStateStore(stateProperties);
+ stateProperties.initializeSerializerUnlessSet(getExecutionConfig());
+ return keyedStateStore.getReducingState(stateProperties);
+ }
+
+ private KeyedStateStore checkPreconditionsAndGetKeyedStateStore(StateDescriptor<?, ?> stateDescriptor) {
+ Preconditions.checkNotNull(stateDescriptor, "The state properties must not be null");
+ KeyedStateStore keyedStateStore = operator.getKeyedStateStore();
+ Preconditions.checkNotNull(keyedStateStore, "Keyed state can only be used on a 'keyed stream', i.e., after a 'keyBy()' operation.");
+ return keyedStateStore;
}
// ------------------ expose (read only) relevant information from the stream config -------- //