You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Marek Pikulski (Jira)" <ji...@apache.org> on 2021/02/04 13:17:00 UTC
[jira] [Created] (BEAM-11749) Portable Flink runner skips timers
when dynamic timer tags are used
Marek Pikulski created BEAM-11749:
-------------------------------------
Summary: Portable Flink runner skips timers when dynamic timer tags are used
Key: BEAM-11749
URL: https://issues.apache.org/jira/browse/BEAM-11749
Project: Beam
Issue Type: Bug
Components: runner-flink
Affects Versions: 2.27.0
Reporter: Marek Pikulski
Timers in Flink runner do not fire as expected. See example code below for details:
{color:#ce9178}"""Demonstrates Beam timer issue with portable Flink runner.{color}
{color:#ce9178}Run (with 'apache_beam' 2.27.0, OpenJDK 8, Python 3.8.2):{color}
{color:#ce9178} python timer_test.py{color}
{color:#ce9178}Typical program output:{color}
{color:#ce9178} ...{color}
{color:#ce9178} INFO:__main__:Setting timer to Timestamp(2), data: (None, [2]){color}
{color:#ce9178} INFO:__main__:Setting timer to Timestamp(1), data: (None, [1]){color}
{color:#ce9178} INFO:__main__:Setting timer to Timestamp(9223371950454.775000), data: (None, []) # noqa: E501{color}
{color:#ce9178} INFO:__main__:Timer fired at Timestamp(9223371950454.775000){color}
{color:#ce9178} ...{color}
{color:#ce9178} (program exits, or hangs with --shutdown_sources_after_idle_ms=9223372036854775807){color}
{color:#ce9178}Expected program output (based on 'Dynamic timer tags' in{color}
{color:#ce9178}https://beam.apache.org/documentation/programming-guide/#timers):{color}
{color:#ce9178} ...{color}
{color:#ce9178} INFO:__main__:Setting timer to Timestamp(2), data: (None, [2]){color}
{color:#ce9178} INFO:__main__:Setting timer to Timestamp(1), data: (None, [1]){color}
{color:#ce9178} INFO:__main__:Setting timer to Timestamp(9223371950454.775000), data: (None, []) # noqa: E501{color}
{color:#ce9178} INFO:__main__:Timer fired at Timestamp(1){color}
{color:#ce9178} INFO:__main__:Timer fired at Timestamp(2){color}
{color:#ce9178} INFO:__main__:Timer fired at Timestamp(9223371950454.775000){color}
{color:#ce9178} ...{color}
{color:#ce9178} (program exit){color}
{color:#ce9178}Note that in this example, things might seem not too bad, because there{color}
{color:#ce9178}is a final timestamp emitted by the `beam.Create` source. In a true streaming{color}
{color:#ce9178}case, however, the timer would fire only at timestamp 1, so data at{color}
{color:#ce9178}timestamp 2 would never be considered complete.{color}
{color:#ce9178}"""{color}
{color:#c586c0}import{color}{color:#d4d4d4} {color}{color:#4ec9b0}sys{color}
{color:#c586c0}import{color}{color:#d4d4d4} {color}{color:#4ec9b0}logging{color}
{color:#c586c0}import{color}{color:#d4d4d4} {color}{color:#4ec9b0}apache_beam{color}{color:#d4d4d4} {color}{color:#c586c0}as{color}{color:#d4d4d4} {color}{color:#4ec9b0}beam{color}
{color:#c586c0}from{color}{color:#d4d4d4} {color}{color:#4ec9b0}apache_beam{color}{color:#d4d4d4}.{color}{color:#4ec9b0}options{color}{color:#d4d4d4}.{color}{color:#4ec9b0}pipeline_options{color}{color:#d4d4d4} {color}{color:#c586c0}import{color}{color:#d4d4d4} {color}{color:#4ec9b0}PipelineOptions{color}
{color:#c586c0}from{color}{color:#d4d4d4} {color}{color:#4ec9b0}apache_beam{color}{color:#d4d4d4}.{color}{color:#4ec9b0}transforms{color}{color:#d4d4d4}.{color}{color:#4ec9b0}timeutil{color}{color:#d4d4d4} {color}{color:#c586c0}import{color}{color:#d4d4d4} {color}{color:#4ec9b0}TimeDomain{color}
{color:#c586c0}from{color}{color:#d4d4d4} {color}{color:#4ec9b0}apache_beam{color}{color:#d4d4d4}.{color}{color:#4ec9b0}transforms{color}{color:#d4d4d4}.{color}{color:#4ec9b0}userstate{color}{color:#d4d4d4} {color}{color:#c586c0}import{color}{color:#d4d4d4} {color}{color:#4ec9b0}TimerSpec{color}{color:#d4d4d4}, {color}{color:#dcdcaa}on_timer{color}
{color:#c586c0}from{color}{color:#d4d4d4} {color}{color:#4ec9b0}apache_beam{color}{color:#d4d4d4}.{color}{color:#4ec9b0}transforms{color}{color:#d4d4d4}.{color}{color:#4ec9b0}window{color}{color:#d4d4d4} {color}{color:#c586c0}import{color}{color:#d4d4d4} {color}{color:#4ec9b0}TimestampCombiner{color}
{color:#4fc1ff}LOGGER{color}{color:#d4d4d4} = {color}{color:#4ec9b0}logging{color}{color:#d4d4d4}.{color}{color:#dcdcaa}getLogger{color}{color:#d4d4d4}({color}{color:#d4d4d4}__name__{color}{color:#d4d4d4}){color}
{color:#569cd6}class{color}{color:#d4d4d4} {color}{color:#4ec9b0}TimerTestFn{color}{color:#d4d4d4}({color}{color:#4ec9b0}beam{color}{color:#d4d4d4}.{color}{color:#4ec9b0}DoFn{color}{color:#d4d4d4}): {color}{color:#6a9955}# noqa: D101{color}
{color:#d4d4d4} {color}{color:#9cdcfe}TIMER_SPEC{color}{color:#d4d4d4} = {color}{color:#4ec9b0}TimerSpec{color}{color:#d4d4d4}({color}{color:#ce9178}'timestamp_expired'{color}{color:#d4d4d4}, {color}{color:#4ec9b0}TimeDomain{color}{color:#d4d4d4}.{color}{color:#9cdcfe}WATERMARK{color}{color:#d4d4d4}){color}
{color:#d4d4d4} {color}{color:#569cd6}def{color}{color:#d4d4d4} {color}{color:#dcdcaa}process{color}{color:#d4d4d4}({color}
{color:#d4d4d4} {color}{color:#9cdcfe}self{color}{color:#d4d4d4},{color}
{color:#d4d4d4} {color}{color:#9cdcfe}element{color}{color:#d4d4d4},{color}
{color:#d4d4d4} {color}{color:#9cdcfe}t{color}{color:#d4d4d4}={color}{color:#4ec9b0}beam{color}{color:#d4d4d4}.{color}{color:#4ec9b0}DoFn{color}{color:#d4d4d4}.{color}{color:#9cdcfe}TimestampParam{color}{color:#d4d4d4},{color}
{color:#d4d4d4} {color}{color:#9cdcfe}timer{color}{color:#d4d4d4}={color}{color:#4ec9b0}beam{color}{color:#d4d4d4}.{color}{color:#4ec9b0}DoFn{color}{color:#d4d4d4}.{color}{color:#9cdcfe}TimerParam{color}{color:#d4d4d4}({color}{color:#9cdcfe}TIMER_SPEC{color}{color:#d4d4d4}){color}
{color:#d4d4d4} ) -> {color}{color:#569cd6}None{color}{color:#d4d4d4}: {color}{color:#6a9955}# noqa: D102{color}
{color:#d4d4d4} {color}{color:#4fc1ff}LOGGER{color}{color:#d4d4d4}.{color}{color:#dcdcaa}info{color}{color:#d4d4d4}({color}{color:#569cd6}f{color}{color:#ce9178}"Setting timer to {color}{color:#569cd6}{{color}{color:#9cdcfe}t{color}{color:#569cd6}}{color}{color:#ce9178}, data: {color}{color:#569cd6}{{color}{color:#9cdcfe}element{color}{color:#569cd6}}{color}{color:#ce9178}"{color}{color:#d4d4d4}){color}
{color:#d4d4d4} {color}{color:#9cdcfe}timer{color}{color:#d4d4d4}.set({color}{color:#9cdcfe}t{color}{color:#d4d4d4}, {color}{color:#9cdcfe}dynamic_timer_tag{color}{color:#d4d4d4}={color}{color:#4ec9b0}str{color}{color:#d4d4d4}({color}{color:#9cdcfe}t{color}{color:#d4d4d4}.micros)){color}
{color:#d4d4d4} {color}{color:#dcdcaa}@{color}{color:#dcdcaa}on_timer{color}{color:#d4d4d4}({color}{color:#9cdcfe}TIMER_SPEC{color}{color:#d4d4d4}){color}
{color:#d4d4d4} {color}{color:#569cd6}def{color}{color:#d4d4d4} {color}{color:#dcdcaa}timer_cb{color}{color:#d4d4d4}({color}
{color:#d4d4d4} {color}{color:#9cdcfe}self{color}{color:#d4d4d4},{color}
{color:#d4d4d4} {color}{color:#9cdcfe}t{color}{color:#d4d4d4}={color}{color:#4ec9b0}beam{color}{color:#d4d4d4}.{color}{color:#4ec9b0}DoFn{color}{color:#d4d4d4}.{color}{color:#9cdcfe}TimestampParam{color}{color:#d4d4d4},{color}
{color:#d4d4d4} {color}{color:#9cdcfe}tag{color}{color:#d4d4d4}={color}{color:#4ec9b0}beam{color}{color:#d4d4d4}.{color}{color:#4ec9b0}DoFn{color}{color:#d4d4d4}.{color}{color:#9cdcfe}DynamicTimerTagParam{color}
{color:#d4d4d4} ) -> {color}{color:#569cd6}None{color}{color:#d4d4d4}: {color}{color:#6a9955}# noqa: D102{color}
{color:#d4d4d4} {color}{color:#4fc1ff}LOGGER{color}{color:#d4d4d4}.{color}{color:#dcdcaa}info{color}{color:#d4d4d4}({color}{color:#569cd6}f{color}{color:#ce9178}"Timer fired at {color}{color:#569cd6}{{color}{color:#9cdcfe}t{color}{color:#569cd6}}{color}{color:#ce9178}"{color}{color:#d4d4d4}){color}
{color:#569cd6}def{color}{color:#d4d4d4} {color}{color:#dcdcaa}main{color}{color:#d4d4d4}():{color}
{color:#d4d4d4} {color}{color:#ce9178}"""Do the work."""{color}
{color:#d4d4d4} {color}{color:#4ec9b0}logging{color}{color:#d4d4d4}.{color}{color:#dcdcaa}basicConfig{color}{color:#d4d4d4}({color}{color:#9cdcfe}level{color}{color:#d4d4d4}={color}{color:#4ec9b0}logging{color}{color:#d4d4d4}.{color}{color:#9cdcfe}INFO{color}{color:#d4d4d4}){color}
{color:#d4d4d4} {color}{color:#4ec9b0}logging{color}{color:#d4d4d4}.{color}{color:#dcdcaa}getLogger{color}{color:#d4d4d4}({color}
{color:#d4d4d4} {color}{color:#ce9178}"apache_beam.utils.subprocess_server"{color}{color:#d4d4d4}).{color}{color:#dcdcaa}setLevel{color}{color:#d4d4d4}({color}{color:#4ec9b0}logging{color}{color:#d4d4d4}.{color}{color:#9cdcfe}WARNING{color}{color:#d4d4d4}){color}
{color:#d4d4d4} {color}{color:#9cdcfe}pipeline_opts{color}{color:#d4d4d4} = {color}{color:#4ec9b0}PipelineOptions{color}{color:#d4d4d4}({color}
{color:#d4d4d4} {color}{color:#4ec9b0}sys{color}{color:#d4d4d4}.{color}{color:#9cdcfe}argv{color}{color:#d4d4d4},{color}
{color:#d4d4d4} {color}{color:#9cdcfe}runner{color}{color:#d4d4d4}={color}{color:#ce9178}'FlinkRunner'{color}{color:#d4d4d4},{color}
{color:#d4d4d4} {color}{color:#9cdcfe}flink_master{color}{color:#d4d4d4}={color}{color:#ce9178}'[local]'{color}{color:#d4d4d4},{color}
{color:#d4d4d4} {color}{color:#9cdcfe}streaming{color}{color:#d4d4d4}={color}{color:#569cd6}True{color}{color:#d4d4d4}){color}
{color:#d4d4d4} {color}{color:#c586c0}with{color}{color:#d4d4d4} {color}{color:#4ec9b0}beam{color}{color:#d4d4d4}.{color}{color:#4ec9b0}Pipeline{color}{color:#d4d4d4}({color}{color:#9cdcfe}options{color}{color:#d4d4d4}={color}{color:#9cdcfe}pipeline_opts{color}{color:#d4d4d4}) {color}{color:#c586c0}as{color}{color:#d4d4d4} {color}{color:#9cdcfe}p{color}{color:#d4d4d4}:{color}
{color:#d4d4d4} ({color}
{color:#d4d4d4} {color}{color:#9cdcfe}p{color}
{color:#d4d4d4} | {color}{color:#4ec9b0}beam{color}{color:#d4d4d4}.{color}{color:#4ec9b0}Create{color}{color:#d4d4d4}([{color}{color:#b5cea8}2{color}{color:#d4d4d4}, {color}{color:#b5cea8}1{color}{color:#d4d4d4}]){color}
{color:#d4d4d4} | {color}{color:#ce9178}'AddTimestamps'{color}{color:#d4d4d4} >> {color}{color:#4ec9b0}beam{color}{color:#d4d4d4}.{color}{color:#dcdcaa}Map{color}{color:#d4d4d4}({color}
{color:#d4d4d4} {color}{color:#569cd6}lambda{color}{color:#d4d4d4} {color}{color:#9cdcfe}element{color}{color:#d4d4d4}: {color}{color:#4ec9b0}beam{color}{color:#d4d4d4}.window.TimestampedValue({color}
{color:#d4d4d4} {color}{color:#9cdcfe}element{color}{color:#d4d4d4}, {color}{color:#9cdcfe}element{color}{color:#d4d4d4})){color}
{color:#d4d4d4} | {color}{color:#ce9178}'GlobalWindow'{color}{color:#d4d4d4} >> {color}{color:#4ec9b0}beam{color}{color:#d4d4d4}.{color}{color:#4ec9b0}WindowInto{color}{color:#d4d4d4}({color}
{color:#d4d4d4} {color}{color:#4ec9b0}beam{color}{color:#d4d4d4}.window.GlobalWindows(),{color}
{color:#d4d4d4} {color}{color:#9cdcfe}trigger{color}{color:#d4d4d4}={color}{color:#4ec9b0}beam{color}{color:#d4d4d4}.trigger.Repeatedly({color}
{color:#d4d4d4} {color}{color:#4ec9b0}beam{color}{color:#d4d4d4}.trigger.AfterCount({color}{color:#b5cea8}1{color}{color:#d4d4d4})),{color}
{color:#d4d4d4} {color}{color:#9cdcfe}accumulation_mode{color}{color:#d4d4d4}={color}{color:#4ec9b0}beam{color}{color:#d4d4d4}.trigger.AccumulationMode.DISCARDING,{color}
{color:#d4d4d4} {color}{color:#9cdcfe}timestamp_combiner{color}{color:#d4d4d4}={color}{color:#4ec9b0}TimestampCombiner{color}{color:#d4d4d4}.{color}{color:#9cdcfe}OUTPUT_AT_LATEST{color}
{color:#d4d4d4} ){color}
{color:#d4d4d4} | {color}{color:#ce9178}'GroupBy'{color}{color:#d4d4d4} >> {color}{color:#4ec9b0}beam{color}{color:#d4d4d4}.{color}{color:#4ec9b0}GroupBy{color}{color:#d4d4d4}({color}{color:#569cd6}lambda{color}{color:#d4d4d4} {color}{color:#9cdcfe}element{color}{color:#d4d4d4}: {color}{color:#569cd6}None{color}{color:#d4d4d4}){color}
{color:#d4d4d4} | {color}{color:#ce9178}'TestTimers'{color}{color:#d4d4d4} >> {color}{color:#4ec9b0}beam{color}{color:#d4d4d4}.{color}{color:#4ec9b0}ParDo{color}{color:#d4d4d4}({color}{color:#4ec9b0}TimerTestFn{color}{color:#d4d4d4}()){color}
{color:#d4d4d4} ){color}
{color:#c586c0}if{color}{color:#d4d4d4} {color}{color:#d4d4d4}__name__{color}{color:#d4d4d4} == {color}{color:#ce9178}"__main__"{color}{color:#d4d4d4}:{color}
{color:#d4d4d4} {color}{color:#dcdcaa}main{color}{color:#d4d4d4}(){color}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)