You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Jacob Bay Larsen <ma...@jacobbay.dk> on 2016/06/22 19:17:35 UTC
State key serializer has not been configured in the config.
Hi,
I am trying to use a ListState in a RichCoFlatMapFunction but when
calling: getRuntimeContext().getListState(descriptor) in the open-function
i am getting a "State key serializer has not .." exception. I am not sure
what i can do to avoid this exception - Are any of you able to provide some
help ?
Best regards
Jacob
private ListState<Tuple2<String, Integer>> deltaPositions;
@Override
public void open(org.apache.flink.configuration.Configuration
parameters) throws Exception {
// Create state variable
ListStateDescriptor<Tuple2<String, Integer>> descriptor =
new ListStateDescriptor<>(
"deltaPositions", // the state name
TypeInformation.of(new TypeHint<Tuple2<String,
Integer>>() {
}));
deltaPositions = getRuntimeContext().getListState(descriptor);
};
2016-06-22 20:41:38,813 INFO org.apache.flink.runtime.taskmanager.Task
- Stream of Items with collection of meadian times (1/1)
switched to FAILED with exception.
java.lang.RuntimeException: Error while getting state
at
org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getListState(StreamingRuntimeContext.java:131)
at
crisplant.bigdata.dataanalysis.baggagemonitor.streaming.liveitemeventsstoring.LiveItemEventsStoring$MergeMedianTimesFlatMapFunction.open(LiveItemEventsStoring.java:83)
at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at
org.apache.flink.streaming.api.operators.co.CoStreamFlatMap.open(CoStreamFlatMap.java:49)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:317)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:215)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.Exception: State key serializer has not been
configured in the config. This operation cannot use partitioned state.
at
org.apache.flink.runtime.state.AbstractStateBackend.getPartitionedState(AbstractStateBackend.java:199)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.getPartitionedState(AbstractStreamOperator.java:260)
at
org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getListState(StreamingRuntimeContext.java:129)
... 8 more
2016-06-22 20:41:38,815 INFO org.apache.flink.runtime.taskmanager.Task
- Freeing ta
--
Jacob Bay Larsen
Phone: +45 6133 1108
E-mail: mail@jacobbay.dk
Re: State key serializer has not been configured in the config.
Posted by Maximilian Michels <mx...@apache.org>.
+1 for a more helpful error message.
@Jacob Would you mind opening a JIRA issue at
https://issues.apache.org/jira/browse/FLINK?
On Thu, Jun 23, 2016 at 11:31 AM, Chesnay Schepler <ch...@apache.org> wrote:
> We should adjust the error message to contain the keyed stream thingy.
>
>
> On 23.06.2016 10:11, Till Rohrmann wrote:
>
> Hi Jacob,
>
> the `ListState` abstraction is a state which we call partitioned/key-value
> state. As such, it is only possible to use it with a keyed stream. This
> means that you have to call `keyBy` after the `connect` API call.
>
> Cheers,
> Till
>
> On Wed, Jun 22, 2016 at 9:17 PM, Jacob Bay Larsen <ma...@jacobbay.dk> wrote:
>>
>> Hi,
>>
>> I am trying to use a ListState in a RichCoFlatMapFunction but when
>> calling: getRuntimeContext().getListState(descriptor) in the open-function i
>> am getting a "State key serializer has not .." exception. I am not sure what
>> i can do to avoid this exception - Are any of you able to provide some help
>> ?
>>
>> Best regards
>> Jacob
>>
>>
>> private ListState<Tuple2<String, Integer>> deltaPositions;
>>
>> @Override
>> public void open(org.apache.flink.configuration.Configuration
>> parameters) throws Exception {
>> // Create state variable
>> ListStateDescriptor<Tuple2<String, Integer>> descriptor =
>> new ListStateDescriptor<>(
>> "deltaPositions", // the state name
>> TypeInformation.of(new TypeHint<Tuple2<String,
>> Integer>>() {
>> }));
>>
>> deltaPositions = getRuntimeContext().getListState(descriptor);
>> };
>>
>>
>>
>> 2016-06-22 20:41:38,813 INFO org.apache.flink.runtime.taskmanager.Task
>> - Stream of Items with collection of meadian times (1/1) switched to FAILED
>> with exception.
>> java.lang.RuntimeException: Error while getting state
>> at
>> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getListState(StreamingRuntimeContext.java:131)
>> at
>> crisplant.bigdata.dataanalysis.baggagemonitor.streaming.liveitemeventsstoring.LiveItemEventsStoring$MergeMedianTimesFlatMapFunction.open(LiveItemEventsStoring.java:83)
>> at
>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
>> at
>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
>> at
>> org.apache.flink.streaming.api.operators.co.CoStreamFlatMap.open(CoStreamFlatMap.java:49)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:317)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:215)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>> at java.lang.Thread.run(Thread.java:745)
>> Caused by: java.lang.Exception: State key serializer has not been
>> configured in the config. This operation cannot use partitioned state.
>> at
>> org.apache.flink.runtime.state.AbstractStateBackend.getPartitionedState(AbstractStateBackend.java:199)
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.getPartitionedState(AbstractStreamOperator.java:260)
>> at
>> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getListState(StreamingRuntimeContext.java:129)
>> ... 8 more
>> 2016-06-22 20:41:38,815 INFO org.apache.flink.runtime.taskmanager.Task
>> - Freeing ta
>>
>> --
>> Jacob Bay Larsen
>>
>> Phone: +45 6133 1108
>> E-mail: mail@jacobbay.dk
>
>
>
Re: State key serializer has not been configured in the config.
Posted by Chesnay Schepler <ch...@apache.org>.
We should adjust the error message to contain the keyed stream thingy.
On 23.06.2016 10:11, Till Rohrmann wrote:
> Hi Jacob,
>
> the `ListState` abstraction is a state which we call
> partitioned/key-value state. As such, it is only possible to use it
> with a keyed stream. This means that you have to call `keyBy` after
> the `connect` API call.
>
> Cheers,
> Till
>
> On Wed, Jun 22, 2016 at 9:17 PM, Jacob Bay Larsen <mail@jacobbay.dk
> <ma...@jacobbay.dk>> wrote:
>
> Hi,
>
> I am trying to use a ListState in a RichCoFlatMapFunction but when
> calling: getRuntimeContext().getListState(descriptor) in the
> open-function i am getting a "State key serializer has not .."
> exception. I am not sure what i can do to avoid this exception -
> Are any of you able to provide some help ?
>
> Best regards
> Jacob
>
>
> private ListState<Tuple2<String, Integer>> deltaPositions;
>
> @Override
> public void open(org.apache.flink.configuration.Configuration
> parameters) throws Exception {
> // Create state variable
> ListStateDescriptor<Tuple2<String, Integer>> descriptor =
> new ListStateDescriptor<>(
> "deltaPositions", // the state name
> TypeInformation.of(new
> TypeHint<Tuple2<String, Integer>>() {
> }));
>
> deltaPositions = getRuntimeContext().getListState(descriptor);
> };
>
>
>
> 2016-06-22 20:41:38,813 INFO
> org.apache.flink.runtime.taskmanager.Task - Stream of Items
> with collection of meadian times (1/1) switched to FAILED with
> exception.
> java.lang.RuntimeException: Error while getting state
> at
> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getListState(StreamingRuntimeContext.java:131)
> at
> crisplant.bigdata.dataanalysis.baggagemonitor.streaming.liveitemeventsstoring.LiveItemEventsStoring$MergeMedianTimesFlatMapFunction.open(LiveItemEventsStoring.java:83)
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
> at
> org.apache.flink.streaming.api.operators.co.CoStreamFlatMap.open(CoStreamFlatMap.java:49)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:317)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:215)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.Exception: State key serializer has not been
> configured in the config. This operation cannot use partitioned state.
> at
> org.apache.flink.runtime.state.AbstractStateBackend.getPartitionedState(AbstractStateBackend.java:199)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.getPartitionedState(AbstractStreamOperator.java:260)
> at
> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getListState(StreamingRuntimeContext.java:129)
> ... 8 more
> 2016-06-22 20:41:38,815 INFO
> org.apache.flink.runtime.taskmanager.Task - Freeing ta
>
> --
> Jacob Bay Larsen
>
> Phone: +45 6133 1108 <tel:%2B45%206133%201108>
> E-mail: mail@jacobbay.dk <ma...@jacobbay.dk>
>
>
Re: State key serializer has not been configured in the config.
Posted by Till Rohrmann <tr...@apache.org>.
Hi Jacob,
the `ListState` abstraction is a state which we call partitioned/key-value
state. As such, it is only possible to use it with a keyed stream. This
means that you have to call `keyBy` after the `connect` API call.
Cheers,
Till
On Wed, Jun 22, 2016 at 9:17 PM, Jacob Bay Larsen <ma...@jacobbay.dk> wrote:
> Hi,
>
> I am trying to use a ListState in a RichCoFlatMapFunction but when
> calling: getRuntimeContext().getListState(descriptor) in the open-function
> i am getting a "State key serializer has not .." exception. I am not sure
> what i can do to avoid this exception - Are any of you able to provide some
> help ?
>
> Best regards
> Jacob
>
>
> private ListState<Tuple2<String, Integer>> deltaPositions;
>
> @Override
> public void open(org.apache.flink.configuration.Configuration
> parameters) throws Exception {
> // Create state variable
> ListStateDescriptor<Tuple2<String, Integer>> descriptor =
> new ListStateDescriptor<>(
> "deltaPositions", // the state name
> TypeInformation.of(new TypeHint<Tuple2<String,
> Integer>>() {
> }));
>
> deltaPositions = getRuntimeContext().getListState(descriptor);
> };
>
>
>
> 2016-06-22 20:41:38,813 INFO org.apache.flink.runtime.taskmanager.Task
> - Stream of Items with collection of meadian times (1/1)
> switched to FAILED with exception.
> java.lang.RuntimeException: Error while getting state
> at
> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getListState(StreamingRuntimeContext.java:131)
> at
> crisplant.bigdata.dataanalysis.baggagemonitor.streaming.liveitemeventsstoring.LiveItemEventsStoring$MergeMedianTimesFlatMapFunction.open(LiveItemEventsStoring.java:83)
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
> at
> org.apache.flink.streaming.api.operators.co.CoStreamFlatMap.open(CoStreamFlatMap.java:49)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:317)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:215)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.Exception: State key serializer has not been
> configured in the config. This operation cannot use partitioned state.
> at
> org.apache.flink.runtime.state.AbstractStateBackend.getPartitionedState(AbstractStateBackend.java:199)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.getPartitionedState(AbstractStreamOperator.java:260)
> at
> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getListState(StreamingRuntimeContext.java:129)
> ... 8 more
> 2016-06-22 20:41:38,815 INFO org.apache.flink.runtime.taskmanager.Task
> - Freeing ta
>
> --
> Jacob Bay Larsen
>
> Phone: +45 6133 1108
> E-mail: mail@jacobbay.dk
>