You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Robert Metzger (JIRA)" <ji...@apache.org> on 2017/06/02 09:20:05 UTC
[jira] [Reopened] (FLINK-6775) StateDescriptor cannot be shared by
multiple subtasks
[ https://issues.apache.org/jira/browse/FLINK-6775?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Robert Metzger reopened FLINK-6775:
-----------------------------------
> StateDescriptor cannot be shared by multiple subtasks
> -----------------------------------------------------
>
> Key: FLINK-6775
> URL: https://issues.apache.org/jira/browse/FLINK-6775
> Project: Flink
> Issue Type: Bug
> Components: DataStream API
> Affects Versions: 1.0.3, 1.1.4, 1.3.0, 1.2.1, 1.4.0
> Reporter: Till Rohrmann
> Assignee: Till Rohrmann
> Priority: Blocker
> Labels: flink-rel-1.3.1-blockers
> Fix For: 1.2.2, 1.3.1, 1.4.0
>
>
> The {{StateDescriptor}} contains the {{TypeSerializer}} which is used to serialize the state. The serializer instance won't be duplicated when it is accessed. Therefore, the {{StateDescriptor}} cannot be shared if the {{TypeSerializer}} is stateful as in the case of the {{KryoSerializer}}.
> This problem can easily arise when a user defines a stateful operator which defines the {{StateDescriptor}} statically. The work around is to not define a static {{StateDescriptor}}. However, I would still make it a blocker, because it is extremely hard to debug for the user if things fail because the {{TypeSerializer}} is used concurrently.
> The following operator produces the problem:
> {code}
> private static final class StatefulMapper extends RichMapFunction<Tuple2<Long,Long>, Tuple2<Long, Long>> implements CheckpointedFunction {
> private static final long serialVersionUID = -1175717056869107847L;
> private static final ValueStateDescriptor<PojoType> POJO_VALUE_STATE = new ValueStateDescriptor<PojoType>("pojoType", PojoType.class);
> private transient ValueState<PojoType> valueState;
> public StatefulMapper() {
> valueState = null;
> }
> @Override
> public Tuple2<Long, Long> map(Tuple2<Long, Long> tuple) throws Exception {
> PojoType pojoType = new PojoType(1, 1.0, "1.0", new NestedPojo(2, 2.0));
> valueState.update(pojoType);
> return tuple;
> }
> @Override
> public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {}
> @Override
> public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
> valueState = functionInitializationContext.getKeyedStateStore().getState(POJO_VALUE_STATE);
> }
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)