You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Dian Fu (Jira)" <ji...@apache.org> on 2021/04/06 12:46:00 UTC

[jira] [Created] (FLINK-22124) The job finished without any exception if error was thrown during state access

Dian Fu created FLINK-22124:
-------------------------------

             Summary: The job finished without any exception if error was thrown during state access
                 Key: FLINK-22124
                 URL: https://issues.apache.org/jira/browse/FLINK-22124
             Project: Flink
          Issue Type: Sub-task
          Components: API / Python
    Affects Versions: 1.13.0
            Reporter: Dian Fu
             Fix For: 1.13.0


For the following job:

{code}
import logging

from pyflink.common import WatermarkStrategy, Row
from pyflink.common.serialization import Encoder
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import FileSink, OutputFileConfig, NumberSequenceSource
from pyflink.datastream.execution_mode import RuntimeExecutionMode
from pyflink.datastream.functions import KeyedProcessFunction, RuntimeContext
from pyflink.datastream.state import MapStateDescriptor


env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(2)
env.set_runtime_mode(RuntimeExecutionMode.BATCH)

seq_num_source = NumberSequenceSource(1, 1000)

file_sink = FileSink \
    .for_row_format('/Users/dianfu/code/src/apache/playgrounds/examples/output/data_stream_batch_state',
                    Encoder.simple_string_encoder()) \
    .with_output_file_config(OutputFileConfig.builder().with_part_prefix('pre').with_part_suffix('suf').build()) \
    .build()

ds = env.from_source(
    source=seq_num_source,
    watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(),
    source_name='file_source',
    type_info=Types.LONG())


class MyKeyedProcessFunction(KeyedProcessFunction):

    def __init__(self):
        self.state = None

    def open(self, runtime_context: RuntimeContext):
        logging.info("open")
        state_desc = MapStateDescriptor('map', Types.LONG(), Types.LONG())
        self.state = runtime_context.get_map_state(state_desc)

    def process_element(self, value, ctx: 'KeyedProcessFunction.Context'):
        existing = self.state.get(value[0])
        if existing is None:
            result = value[1]
            self.state.put(value[0], result)
        elif existing <= 10:
            result = value[1] + existing
            self.state.put(value[0], result)
        else:
            result = existing
        yield result


ds.map(lambda a: Row(a % 4, 1), output_type=Types.ROW([Types.LONG(), Types.LONG()])) \
    .key_by(lambda a: a[0]) \
    .process(MyKeyedProcessFunction(), Types.LONG()) \
    .sink_to(file_sink)

env.execute('data_stream_batch_state')
{code}

As it will encounter KeyError for `self.state.get(value[0])`, the job finished without any error message. This issue should be addressed. We should make sure the error message appears in the log file to help users to figure out what happens.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)