You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Dian Fu (Jira)" <ji...@apache.org> on 2021/04/06 12:46:00 UTC
[jira] [Created] (FLINK-22124) The job finished without any
exception if error was thrown during state access
Dian Fu created FLINK-22124:
-------------------------------
Summary: The job finished without any exception if error was thrown during state access
Key: FLINK-22124
URL: https://issues.apache.org/jira/browse/FLINK-22124
Project: Flink
Issue Type: Sub-task
Components: API / Python
Affects Versions: 1.13.0
Reporter: Dian Fu
Fix For: 1.13.0
For the following job:
{code}
import logging
from pyflink.common import WatermarkStrategy, Row
from pyflink.common.serialization import Encoder
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import FileSink, OutputFileConfig, NumberSequenceSource
from pyflink.datastream.execution_mode import RuntimeExecutionMode
from pyflink.datastream.functions import KeyedProcessFunction, RuntimeContext
from pyflink.datastream.state import MapStateDescriptor
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(2)
env.set_runtime_mode(RuntimeExecutionMode.BATCH)
seq_num_source = NumberSequenceSource(1, 1000)
file_sink = FileSink \
.for_row_format('/Users/dianfu/code/src/apache/playgrounds/examples/output/data_stream_batch_state',
Encoder.simple_string_encoder()) \
.with_output_file_config(OutputFileConfig.builder().with_part_prefix('pre').with_part_suffix('suf').build()) \
.build()
ds = env.from_source(
source=seq_num_source,
watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(),
source_name='file_source',
type_info=Types.LONG())
class MyKeyedProcessFunction(KeyedProcessFunction):
def __init__(self):
self.state = None
def open(self, runtime_context: RuntimeContext):
logging.info("open")
state_desc = MapStateDescriptor('map', Types.LONG(), Types.LONG())
self.state = runtime_context.get_map_state(state_desc)
def process_element(self, value, ctx: 'KeyedProcessFunction.Context'):
existing = self.state.get(value[0])
if existing is None:
result = value[1]
self.state.put(value[0], result)
elif existing <= 10:
result = value[1] + existing
self.state.put(value[0], result)
else:
result = existing
yield result
ds.map(lambda a: Row(a % 4, 1), output_type=Types.ROW([Types.LONG(), Types.LONG()])) \
.key_by(lambda a: a[0]) \
.process(MyKeyedProcessFunction(), Types.LONG()) \
.sink_to(file_sink)
env.execute('data_stream_batch_state')
{code}
As it will encounter KeyError for `self.state.get(value[0])`, the job finished without any error message. This issue should be addressed. We should make sure the error message appears in the log file to help users to figure out what happens.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)