You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (Jira)" <ji...@apache.org> on 2022/04/13 11:04:00 UTC
[jira] [Updated] (FLINK-27223) State access doesn't work as expected when cache size is set to 0
[ https://issues.apache.org/jira/browse/FLINK-27223?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
ASF GitHub Bot updated FLINK-27223:
-----------------------------------
Labels: pull-request-available (was: )
> State access doesn't work as expected when cache size is set to 0
> -----------------------------------------------------------------
>
> Key: FLINK-27223
> URL: https://issues.apache.org/jira/browse/FLINK-27223
> Project: Flink
> Issue Type: Bug
> Components: API / Python
> Affects Versions: 1.15.0
> Reporter: Dian Fu
> Assignee: Dian Fu
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.15.1
>
>
> For the following job:
> {code}
> import json
> import logging
> import sys
> from pyflink.common import Types, Configuration
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.util.java_utils import get_j_env_configuration
> if __name__ == '__main__':
> logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")
> env = StreamExecutionEnvironment.get_execution_environment()
> config = Configuration(
> j_configuration=get_j_env_configuration(env._j_stream_execution_environment))
> config.set_integer("python.state.cache-size", 0)
> env.set_parallelism(1)
> # define the source
> ds = env.from_collection(
> collection=[
> (1, '{"name": "Flink", "tel": 123, "addr": {"country": "Germany", "city": "Berlin"}}'),
> (2, '{"name": "hello", "tel": 135, "addr": {"country": "China", "city": "Shanghai"}}'),
> (3, '{"name": "world", "tel": 124, "addr": {"country": "USA", "city": "NewYork"}}'),
> (4, '{"name": "PyFlink", "tel": 32, "addr": {"country": "China", "city": "Hangzhou"}}')
> ],
> type_info=Types.ROW_NAMED(["id", "info"], [Types.INT(), Types.STRING()])
> )
> # key by
> ds = ds.map(lambda data: (json.loads(data.info)['addr']['country'],
> json.loads(data.info)['tel'])) \
> .key_by(lambda data: data[0]).sum(1)
> ds.print()
> env.execute()
> {code}
> The expected result should be:
> {code}
> ('Germany', 123)
> ('China', 135)
> ('USA', 124)
> ('China', 167)
> {code}
> However, the actual result is:
> {code}
> ('Germany', 123)
> ('China', 135)
> ('USA', 124)
> ('China', 32)
> {code}
--
This message was sent by Atlassian Jira
(v8.20.1#820001)