You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Dian Fu (Jira)" <ji...@apache.org> on 2022/04/13 10:59:00 UTC

[jira] [Created] (FLINK-27223) State access doesn't work as expected when cache size is set to 0

Dian Fu created FLINK-27223:
-------------------------------

             Summary: State access doesn't work as expected when cache size is set to 0
                 Key: FLINK-27223
                 URL: https://issues.apache.org/jira/browse/FLINK-27223
             Project: Flink
          Issue Type: Bug
          Components: API / Python
    Affects Versions: 1.15.0
            Reporter: Dian Fu
            Assignee: Dian Fu
             Fix For: 1.15.1


For the following job:
{code}
import json
import logging
import sys

from pyflink.common import Types, Configuration
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.util.java_utils import get_j_env_configuration

if __name__ == '__main__':
    logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")

    env = StreamExecutionEnvironment.get_execution_environment()
    config = Configuration(
        j_configuration=get_j_env_configuration(env._j_stream_execution_environment))
    config.set_integer("python.state.cache-size", 0)
    env.set_parallelism(1)

    # define the source
    ds = env.from_collection(
        collection=[
            (1, '{"name": "Flink", "tel": 123, "addr": {"country": "Germany", "city": "Berlin"}}'),
            (2, '{"name": "hello", "tel": 135, "addr": {"country": "China", "city": "Shanghai"}}'),
            (3, '{"name": "world", "tel": 124, "addr": {"country": "USA", "city": "NewYork"}}'),
            (4, '{"name": "PyFlink", "tel": 32, "addr": {"country": "China", "city": "Hangzhou"}}')
        ],
        type_info=Types.ROW_NAMED(["id", "info"], [Types.INT(), Types.STRING()])
    )

    # key by
    ds = ds.map(lambda data: (json.loads(data.info)['addr']['country'],
                              json.loads(data.info)['tel'])) \
           .key_by(lambda data: data[0]).sum(1)
    ds.print()
    env.execute()
{code}

The expected result should be:
{code}
('Germany', 123)
('China', 135)
('USA', 124)
('China', 167)
{code}

However, the actual result is:
{code}
('Germany', 123)
('China', 135)
('USA', 124)
('China', 32)
{code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)