You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2017/05/31 12:02:04 UTC

[jira] [Commented] (FLINK-6775) StateDescriptor cannot be shared by multiple subtasks

    [ https://issues.apache.org/jira/browse/FLINK-6775?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16031052#comment-16031052 ] 

ASF GitHub Bot commented on FLINK-6775:
---------------------------------------

GitHub user tillrohrmann opened a pull request:

    https://github.com/apache/flink/pull/4025

    [FLINK-6775] [state] Duplicate StateDescriptor's serializer

    Duplicate the TypeSerializer before returning it from the StateDescriptor. That way
    we ensure that StateDescriptors can be shared by multiple threads.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tillrohrmann/flink fixStateDescriptor

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/4025.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #4025
    
----
commit 16cb7b12e9d8a59ad957c206f03076fca2143b71
Author: Till Rohrmann <tr...@apache.org>
Date:   2017-05-31T11:59:55Z

    [FLINK-6775] [state] Duplicate StateDescriptor's serializer
    
    Duplicate the TypeSerializer before returning it from the StateDescriptor. That way
    we ensure that StateDescriptors can be shared by multiple threads.

----


> StateDescriptor cannot be shared by multiple subtasks
> -----------------------------------------------------
>
>                 Key: FLINK-6775
>                 URL: https://issues.apache.org/jira/browse/FLINK-6775
>             Project: Flink
>          Issue Type: Bug
>          Components: DataStream API
>    Affects Versions: 1.0.3, 1.1.4, 1.3.0, 1.2.1, 1.4.0
>            Reporter: Till Rohrmann
>            Assignee: Till Rohrmann
>            Priority: Blocker
>
> The {{StateDescriptor}} contains the {{TypeSerializer}} which is used to serialize the state. The serializer instance won't be duplicated when it is accessed. Therefore, the {{StateDescriptor}} cannot be shared if the {{TypeSerializer}} is stateful as in the case of the {{KryoSerializer}}.
> This problem can easily arise when a user defines a stateful operator which defines the {{StateDescriptor}} statically. The work around is to not define a static {{StateDescriptor}}. However, I would still make it a blocker, because it is extremely hard to debug for the user if things fail because the {{TypeSerializer}} is used concurrently.
> The following operator produces the problem:
> {code}
> private static final class StatefulMapper extends RichMapFunction<Tuple2<Long,Long>, Tuple2<Long, Long>> implements CheckpointedFunction {
>         private static final long serialVersionUID = -1175717056869107847L;
>         private static final ValueStateDescriptor<PojoType> POJO_VALUE_STATE = new ValueStateDescriptor<PojoType>("pojoType", PojoType.class);
>         private transient ValueState<PojoType> valueState;
>         public StatefulMapper() {
>             valueState = null;
>         }
>         @Override
>         public Tuple2<Long, Long> map(Tuple2<Long, Long> tuple) throws Exception {
>             PojoType pojoType = new PojoType(1, 1.0, "1.0", new NestedPojo(2, 2.0));
>             valueState.update(pojoType);
>             return tuple;
>         }
>         @Override
>         public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {}
>         @Override
>         public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
>             valueState = functionInitializationContext.getKeyedStateStore().getState(POJO_VALUE_STATE);
>         }
>     }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)