You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Dian Fu (Jira)" <ji...@apache.org> on 2022/04/14 04:12:00 UTC

[jira] [Closed] (FLINK-27223) State access doesn't work as expected when cache size is set to 0

     [ https://issues.apache.org/jira/browse/FLINK-27223?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Dian Fu closed FLINK-27223.
---------------------------
    Fix Version/s: 1.14.5
       Resolution: Fixed

Fixed in:
- master via eee8804bb8db97864ecff6b27853570e75b15807
- release-1.15 via b070a7bb424da40ce0839a1151b110701de48a2c
- release-1.14 via d4de83b4dc66d18acc4355d1edafa95fd1adca7a

> State access doesn't work as expected when cache size is set to 0
> -----------------------------------------------------------------
>
>                 Key: FLINK-27223
>                 URL: https://issues.apache.org/jira/browse/FLINK-27223
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Python
>    Affects Versions: 1.15.0
>            Reporter: Dian Fu
>            Assignee: Dian Fu
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.14.5, 1.15.1
>
>
> For the following job:
> {code}
> import json
> import logging
> import sys
> from pyflink.common import Types, Configuration
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.util.java_utils import get_j_env_configuration
> if __name__ == '__main__':
>     logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")
>     env = StreamExecutionEnvironment.get_execution_environment()
>     config = Configuration(
>         j_configuration=get_j_env_configuration(env._j_stream_execution_environment))
>     config.set_integer("python.state.cache-size", 0)
>     env.set_parallelism(1)
>     # define the source
>     ds = env.from_collection(
>         collection=[
>             (1, '{"name": "Flink", "tel": 123, "addr": {"country": "Germany", "city": "Berlin"}}'),
>             (2, '{"name": "hello", "tel": 135, "addr": {"country": "China", "city": "Shanghai"}}'),
>             (3, '{"name": "world", "tel": 124, "addr": {"country": "USA", "city": "NewYork"}}'),
>             (4, '{"name": "PyFlink", "tel": 32, "addr": {"country": "China", "city": "Hangzhou"}}')
>         ],
>         type_info=Types.ROW_NAMED(["id", "info"], [Types.INT(), Types.STRING()])
>     )
>     # key by
>     ds = ds.map(lambda data: (json.loads(data.info)['addr']['country'],
>                               json.loads(data.info)['tel'])) \
>            .key_by(lambda data: data[0]).sum(1)
>     ds.print()
>     env.execute()
> {code}
> The expected result should be:
> {code}
> ('Germany', 123)
> ('China', 135)
> ('USA', 124)
> ('China', 167)
> {code}
> However, the actual result is:
> {code}
> ('Germany', 123)
> ('China', 135)
> ('USA', 124)
> ('China', 32)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)