You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Marek Pikulski (Jira)" <ji...@apache.org> on 2021/02/04 13:27:00 UTC

[jira] [Updated] (BEAM-11749) Portable Flink runner skips timers when dynamic timer tags are used

     [ https://issues.apache.org/jira/browse/BEAM-11749?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Marek Pikulski updated BEAM-11749:
----------------------------------
    Description: Timers in Flink runner do not fire as expected. See attached example code for details.  (was: Timers in Flink runner do not fire as expected. See example code below for details:

 
{color:#ce9178}"""Demonstrates Beam timer issue with portable Flink runner.{color}

{color:#ce9178}Run (with 'apache_beam' 2.27.0, OpenJDK 8, Python 3.8.2):{color}

{color:#ce9178}  python timer_test.py{color}

{color:#ce9178}Typical program output:{color}

{color:#ce9178}  ...{color}
{color:#ce9178}  INFO:__main__:Setting timer to Timestamp(2), data: (None, [2]){color}
{color:#ce9178}  INFO:__main__:Setting timer to Timestamp(1), data: (None, [1]){color}
{color:#ce9178}  INFO:__main__:Setting timer to Timestamp(9223371950454.775000), data: (None, [])  # noqa: E501{color}
{color:#ce9178}  INFO:__main__:Timer fired at Timestamp(9223371950454.775000){color}
{color:#ce9178}  ...{color}
{color:#ce9178}  (program exits, or hangs with --shutdown_sources_after_idle_ms=9223372036854775807){color}

{color:#ce9178}Expected program output (based on 'Dynamic timer tags' in{color}
{color:#ce9178}https://beam.apache.org/documentation/programming-guide/#timers):{color}

{color:#ce9178}  ...{color}
{color:#ce9178}  INFO:__main__:Setting timer to Timestamp(2), data: (None, [2]){color}
{color:#ce9178}  INFO:__main__:Setting timer to Timestamp(1), data: (None, [1]){color}
{color:#ce9178}  INFO:__main__:Setting timer to Timestamp(9223371950454.775000), data: (None, [])  # noqa: E501{color}
{color:#ce9178}  INFO:__main__:Timer fired at Timestamp(1){color}
{color:#ce9178}  INFO:__main__:Timer fired at Timestamp(2){color}
{color:#ce9178}  INFO:__main__:Timer fired at Timestamp(9223371950454.775000){color}
{color:#ce9178}  ...{color}
{color:#ce9178}  (program exit){color}

{color:#ce9178}Note that in this example, things might seem not too bad, because there{color}
{color:#ce9178}is a final timestamp emitted by the `beam.Create` source. In a true streaming{color}
{color:#ce9178}case, however, the timer would fire only at timestamp 1, so data at{color}
{color:#ce9178}timestamp 2 would never be considered complete.{color}
{color:#ce9178}"""{color}


{color:#c586c0}import{color}{color:#d4d4d4} {color}{color:#4ec9b0}sys{color}
{color:#c586c0}import{color}{color:#d4d4d4} {color}{color:#4ec9b0}logging{color}

{color:#c586c0}import{color}{color:#d4d4d4} {color}{color:#4ec9b0}apache_beam{color}{color:#d4d4d4} {color}{color:#c586c0}as{color}{color:#d4d4d4} {color}{color:#4ec9b0}beam{color}
{color:#c586c0}from{color}{color:#d4d4d4} {color}{color:#4ec9b0}apache_beam{color}{color:#d4d4d4}.{color}{color:#4ec9b0}options{color}{color:#d4d4d4}.{color}{color:#4ec9b0}pipeline_options{color}{color:#d4d4d4} {color}{color:#c586c0}import{color}{color:#d4d4d4} {color}{color:#4ec9b0}PipelineOptions{color}
{color:#c586c0}from{color}{color:#d4d4d4} {color}{color:#4ec9b0}apache_beam{color}{color:#d4d4d4}.{color}{color:#4ec9b0}transforms{color}{color:#d4d4d4}.{color}{color:#4ec9b0}timeutil{color}{color:#d4d4d4} {color}{color:#c586c0}import{color}{color:#d4d4d4} {color}{color:#4ec9b0}TimeDomain{color}
{color:#c586c0}from{color}{color:#d4d4d4} {color}{color:#4ec9b0}apache_beam{color}{color:#d4d4d4}.{color}{color:#4ec9b0}transforms{color}{color:#d4d4d4}.{color}{color:#4ec9b0}userstate{color}{color:#d4d4d4} {color}{color:#c586c0}import{color}{color:#d4d4d4} {color}{color:#4ec9b0}TimerSpec{color}{color:#d4d4d4}, {color}{color:#dcdcaa}on_timer{color}
{color:#c586c0}from{color}{color:#d4d4d4} {color}{color:#4ec9b0}apache_beam{color}{color:#d4d4d4}.{color}{color:#4ec9b0}transforms{color}{color:#d4d4d4}.{color}{color:#4ec9b0}window{color}{color:#d4d4d4} {color}{color:#c586c0}import{color}{color:#d4d4d4} {color}{color:#4ec9b0}TimestampCombiner{color}


{color:#4fc1ff}LOGGER{color}{color:#d4d4d4} = {color}{color:#4ec9b0}logging{color}{color:#d4d4d4}.{color}{color:#dcdcaa}getLogger{color}{color:#d4d4d4}({color}{color:#d4d4d4}__name__{color}{color:#d4d4d4}){color}


{color:#569cd6}class{color}{color:#d4d4d4} {color}{color:#4ec9b0}TimerTestFn{color}{color:#d4d4d4}({color}{color:#4ec9b0}beam{color}{color:#d4d4d4}.{color}{color:#4ec9b0}DoFn{color}{color:#d4d4d4}):  {color}{color:#6a9955}# noqa: D101{color}

{color:#d4d4d4}    {color}{color:#9cdcfe}TIMER_SPEC{color}{color:#d4d4d4} = {color}{color:#4ec9b0}TimerSpec{color}{color:#d4d4d4}({color}{color:#ce9178}'timestamp_expired'{color}{color:#d4d4d4}, {color}{color:#4ec9b0}TimeDomain{color}{color:#d4d4d4}.{color}{color:#9cdcfe}WATERMARK{color}{color:#d4d4d4}){color}

{color:#d4d4d4}    {color}{color:#569cd6}def{color}{color:#d4d4d4} {color}{color:#dcdcaa}process{color}{color:#d4d4d4}({color}
{color:#d4d4d4}            {color}{color:#9cdcfe}self{color}{color:#d4d4d4},{color}
{color:#d4d4d4}            {color}{color:#9cdcfe}element{color}{color:#d4d4d4},{color}
{color:#d4d4d4}            {color}{color:#9cdcfe}t{color}{color:#d4d4d4}={color}{color:#4ec9b0}beam{color}{color:#d4d4d4}.{color}{color:#4ec9b0}DoFn{color}{color:#d4d4d4}.{color}{color:#9cdcfe}TimestampParam{color}{color:#d4d4d4},{color}
{color:#d4d4d4}            {color}{color:#9cdcfe}timer{color}{color:#d4d4d4}={color}{color:#4ec9b0}beam{color}{color:#d4d4d4}.{color}{color:#4ec9b0}DoFn{color}{color:#d4d4d4}.{color}{color:#9cdcfe}TimerParam{color}{color:#d4d4d4}({color}{color:#9cdcfe}TIMER_SPEC{color}{color:#d4d4d4}){color}
{color:#d4d4d4}    ) -> {color}{color:#569cd6}None{color}{color:#d4d4d4}:  {color}{color:#6a9955}# noqa: D102{color}
{color:#d4d4d4}        {color}{color:#4fc1ff}LOGGER{color}{color:#d4d4d4}.{color}{color:#dcdcaa}info{color}{color:#d4d4d4}({color}{color:#569cd6}f{color}{color:#ce9178}"Setting timer to {color}{color:#569cd6}{{color}{color:#9cdcfe}t{color}{color:#569cd6}}{color}{color:#ce9178}, data: {color}{color:#569cd6}{{color}{color:#9cdcfe}element{color}{color:#569cd6}}{color}{color:#ce9178}"{color}{color:#d4d4d4}){color}
{color:#d4d4d4}        {color}{color:#9cdcfe}timer{color}{color:#d4d4d4}.set({color}{color:#9cdcfe}t{color}{color:#d4d4d4}, {color}{color:#9cdcfe}dynamic_timer_tag{color}{color:#d4d4d4}={color}{color:#4ec9b0}str{color}{color:#d4d4d4}({color}{color:#9cdcfe}t{color}{color:#d4d4d4}.micros)){color}

{color:#d4d4d4}    {color}{color:#dcdcaa}@{color}{color:#dcdcaa}on_timer{color}{color:#d4d4d4}({color}{color:#9cdcfe}TIMER_SPEC{color}{color:#d4d4d4}){color}
{color:#d4d4d4}    {color}{color:#569cd6}def{color}{color:#d4d4d4} {color}{color:#dcdcaa}timer_cb{color}{color:#d4d4d4}({color}
{color:#d4d4d4}            {color}{color:#9cdcfe}self{color}{color:#d4d4d4},{color}
{color:#d4d4d4}            {color}{color:#9cdcfe}t{color}{color:#d4d4d4}={color}{color:#4ec9b0}beam{color}{color:#d4d4d4}.{color}{color:#4ec9b0}DoFn{color}{color:#d4d4d4}.{color}{color:#9cdcfe}TimestampParam{color}{color:#d4d4d4},{color}
{color:#d4d4d4}            {color}{color:#9cdcfe}tag{color}{color:#d4d4d4}={color}{color:#4ec9b0}beam{color}{color:#d4d4d4}.{color}{color:#4ec9b0}DoFn{color}{color:#d4d4d4}.{color}{color:#9cdcfe}DynamicTimerTagParam{color}
{color:#d4d4d4}    ) -> {color}{color:#569cd6}None{color}{color:#d4d4d4}:  {color}{color:#6a9955}# noqa: D102{color}
{color:#d4d4d4}        {color}{color:#4fc1ff}LOGGER{color}{color:#d4d4d4}.{color}{color:#dcdcaa}info{color}{color:#d4d4d4}({color}{color:#569cd6}f{color}{color:#ce9178}"Timer fired at {color}{color:#569cd6}{{color}{color:#9cdcfe}t{color}{color:#569cd6}}{color}{color:#ce9178}"{color}{color:#d4d4d4}){color}


{color:#569cd6}def{color}{color:#d4d4d4} {color}{color:#dcdcaa}main{color}{color:#d4d4d4}():{color}
{color:#d4d4d4}    {color}{color:#ce9178}"""Do the work."""{color}
{color:#d4d4d4}    {color}{color:#4ec9b0}logging{color}{color:#d4d4d4}.{color}{color:#dcdcaa}basicConfig{color}{color:#d4d4d4}({color}{color:#9cdcfe}level{color}{color:#d4d4d4}={color}{color:#4ec9b0}logging{color}{color:#d4d4d4}.{color}{color:#9cdcfe}INFO{color}{color:#d4d4d4}){color}
{color:#d4d4d4}    {color}{color:#4ec9b0}logging{color}{color:#d4d4d4}.{color}{color:#dcdcaa}getLogger{color}{color:#d4d4d4}({color}
{color:#d4d4d4}        {color}{color:#ce9178}"apache_beam.utils.subprocess_server"{color}{color:#d4d4d4}).{color}{color:#dcdcaa}setLevel{color}{color:#d4d4d4}({color}{color:#4ec9b0}logging{color}{color:#d4d4d4}.{color}{color:#9cdcfe}WARNING{color}{color:#d4d4d4}){color}

{color:#d4d4d4}    {color}{color:#9cdcfe}pipeline_opts{color}{color:#d4d4d4} = {color}{color:#4ec9b0}PipelineOptions{color}{color:#d4d4d4}({color}
{color:#d4d4d4}        {color}{color:#4ec9b0}sys{color}{color:#d4d4d4}.{color}{color:#9cdcfe}argv{color}{color:#d4d4d4},{color}
{color:#d4d4d4}        {color}{color:#9cdcfe}runner{color}{color:#d4d4d4}={color}{color:#ce9178}'FlinkRunner'{color}{color:#d4d4d4},{color}
{color:#d4d4d4}        {color}{color:#9cdcfe}flink_master{color}{color:#d4d4d4}={color}{color:#ce9178}'[local]'{color}{color:#d4d4d4},{color}
{color:#d4d4d4}        {color}{color:#9cdcfe}streaming{color}{color:#d4d4d4}={color}{color:#569cd6}True{color}{color:#d4d4d4}){color}

{color:#d4d4d4}    {color}{color:#c586c0}with{color}{color:#d4d4d4} {color}{color:#4ec9b0}beam{color}{color:#d4d4d4}.{color}{color:#4ec9b0}Pipeline{color}{color:#d4d4d4}({color}{color:#9cdcfe}options{color}{color:#d4d4d4}={color}{color:#9cdcfe}pipeline_opts{color}{color:#d4d4d4}) {color}{color:#c586c0}as{color}{color:#d4d4d4} {color}{color:#9cdcfe}p{color}{color:#d4d4d4}:{color}
{color:#d4d4d4}        ({color}
{color:#d4d4d4}            {color}{color:#9cdcfe}p{color}
{color:#d4d4d4}            | {color}{color:#4ec9b0}beam{color}{color:#d4d4d4}.{color}{color:#4ec9b0}Create{color}{color:#d4d4d4}([{color}{color:#b5cea8}2{color}{color:#d4d4d4}, {color}{color:#b5cea8}1{color}{color:#d4d4d4}]){color}
{color:#d4d4d4}            | {color}{color:#ce9178}'AddTimestamps'{color}{color:#d4d4d4} >> {color}{color:#4ec9b0}beam{color}{color:#d4d4d4}.{color}{color:#dcdcaa}Map{color}{color:#d4d4d4}({color}
{color:#d4d4d4}               {color}{color:#569cd6}lambda{color}{color:#d4d4d4} {color}{color:#9cdcfe}element{color}{color:#d4d4d4}: {color}{color:#4ec9b0}beam{color}{color:#d4d4d4}.window.TimestampedValue({color}
{color:#d4d4d4}                   {color}{color:#9cdcfe}element{color}{color:#d4d4d4}, {color}{color:#9cdcfe}element{color}{color:#d4d4d4})){color}
{color:#d4d4d4}            | {color}{color:#ce9178}'GlobalWindow'{color}{color:#d4d4d4} >> {color}{color:#4ec9b0}beam{color}{color:#d4d4d4}.{color}{color:#4ec9b0}WindowInto{color}{color:#d4d4d4}({color}
{color:#d4d4d4}                    {color}{color:#4ec9b0}beam{color}{color:#d4d4d4}.window.GlobalWindows(),{color}
{color:#d4d4d4}                    {color}{color:#9cdcfe}trigger{color}{color:#d4d4d4}={color}{color:#4ec9b0}beam{color}{color:#d4d4d4}.trigger.Repeatedly({color}
{color:#d4d4d4}                        {color}{color:#4ec9b0}beam{color}{color:#d4d4d4}.trigger.AfterCount({color}{color:#b5cea8}1{color}{color:#d4d4d4})),{color}
{color:#d4d4d4}                    {color}{color:#9cdcfe}accumulation_mode{color}{color:#d4d4d4}={color}{color:#4ec9b0}beam{color}{color:#d4d4d4}.trigger.AccumulationMode.DISCARDING,{color}
{color:#d4d4d4}                    {color}{color:#9cdcfe}timestamp_combiner{color}{color:#d4d4d4}={color}{color:#4ec9b0}TimestampCombiner{color}{color:#d4d4d4}.{color}{color:#9cdcfe}OUTPUT_AT_LATEST{color}
{color:#d4d4d4}                ){color}
{color:#d4d4d4}            | {color}{color:#ce9178}'GroupBy'{color}{color:#d4d4d4} >> {color}{color:#4ec9b0}beam{color}{color:#d4d4d4}.{color}{color:#4ec9b0}GroupBy{color}{color:#d4d4d4}({color}{color:#569cd6}lambda{color}{color:#d4d4d4} {color}{color:#9cdcfe}element{color}{color:#d4d4d4}: {color}{color:#569cd6}None{color}{color:#d4d4d4}){color}
{color:#d4d4d4}            | {color}{color:#ce9178}'TestTimers'{color}{color:#d4d4d4} >> {color}{color:#4ec9b0}beam{color}{color:#d4d4d4}.{color}{color:#4ec9b0}ParDo{color}{color:#d4d4d4}({color}{color:#4ec9b0}TimerTestFn{color}{color:#d4d4d4}()){color}
{color:#d4d4d4}        ){color}


{color:#c586c0}if{color}{color:#d4d4d4} {color}{color:#d4d4d4}__name__{color}{color:#d4d4d4} == {color}{color:#ce9178}"__main__"{color}{color:#d4d4d4}:{color}
{color:#d4d4d4}    {color}{color:#dcdcaa}main{color}{color:#d4d4d4}(){color})

> Portable Flink runner skips timers when dynamic timer tags are used
> -------------------------------------------------------------------
>
>                 Key: BEAM-11749
>                 URL: https://issues.apache.org/jira/browse/BEAM-11749
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-flink
>    Affects Versions: 2.27.0
>            Reporter: Marek Pikulski
>            Priority: P2
>         Attachments: timer_test.py
>
>
> Timers in Flink runner do not fire as expected. See attached example code for details.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)