You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Robert Metzger (JIRA)" <ji...@apache.org> on 2017/06/02 09:20:05 UTC

[jira] [Reopened] (FLINK-6775) StateDescriptor cannot be shared by multiple subtasks

     [ https://issues.apache.org/jira/browse/FLINK-6775?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Robert Metzger reopened FLINK-6775:
-----------------------------------

> StateDescriptor cannot be shared by multiple subtasks
> -----------------------------------------------------
>
>                 Key: FLINK-6775
>                 URL: https://issues.apache.org/jira/browse/FLINK-6775
>             Project: Flink
>          Issue Type: Bug
>          Components: DataStream API
>    Affects Versions: 1.0.3, 1.1.4, 1.3.0, 1.2.1, 1.4.0
>            Reporter: Till Rohrmann
>            Assignee: Till Rohrmann
>            Priority: Blocker
>              Labels: flink-rel-1.3.1-blockers
>             Fix For: 1.2.2, 1.3.1, 1.4.0
>
>
> The {{StateDescriptor}} contains the {{TypeSerializer}} which is used to serialize the state. The serializer instance won't be duplicated when it is accessed. Therefore, the {{StateDescriptor}} cannot be shared if the {{TypeSerializer}} is stateful as in the case of the {{KryoSerializer}}.
> This problem can easily arise when a user defines a stateful operator which defines the {{StateDescriptor}} statically. The work around is to not define a static {{StateDescriptor}}. However, I would still make it a blocker, because it is extremely hard to debug for the user if things fail because the {{TypeSerializer}} is used concurrently.
> The following operator produces the problem:
> {code}
> private static final class StatefulMapper extends RichMapFunction<Tuple2<Long,Long>, Tuple2<Long, Long>> implements CheckpointedFunction {
>         private static final long serialVersionUID = -1175717056869107847L;
>         private static final ValueStateDescriptor<PojoType> POJO_VALUE_STATE = new ValueStateDescriptor<PojoType>("pojoType", PojoType.class);
>         private transient ValueState<PojoType> valueState;
>         public StatefulMapper() {
>             valueState = null;
>         }
>         @Override
>         public Tuple2<Long, Long> map(Tuple2<Long, Long> tuple) throws Exception {
>             PojoType pojoType = new PojoType(1, 1.0, "1.0", new NestedPojo(2, 2.0));
>             valueState.update(pojoType);
>             return tuple;
>         }
>         @Override
>         public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {}
>         @Override
>         public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
>             valueState = functionInitializationContext.getKeyedStateStore().getState(POJO_VALUE_STATE);
>         }
>     }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)