You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Bazyli Polednia <ba...@gmail.com> on 2023/06/30 10:29:52 UTC

[Question] Inconsistency between runners in handling empty side inputs

Hi, I am experiencing an inconsistency between DirectRunner and
DataflowRunner while using the side input pattern for FlatMap in streaming
pipelines. Consider the logic presented in a unit test below:

```
import unittest

from apache_beam import FlatMap, Flatten, Map, ParDo, pvalue, WindowInto
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.test_stream import TestStream
from apache_beam.testing.util import assert_that, equal_to
from apache_beam.transforms.window import TimestampedValue
from apache_beam.transforms import window
from apache_beam.transforms.trigger import Repeatedly, AfterCount,
AccumulationMode

from typing import List, Iterator

class Event:
    def __init__(self, timestamp):
        self.timestamp = timestamp

    def __str__(self):
        return f"Event: {self.timestamp}"


class Message:
    def __init__(self, timestamp):
        self.timestamp = timestamp

    def __str__(self):
        return f"MSG: {self.timestamp}"


def match_message_to_events(
    message: Message, events: List[Event],
) -> Iterator[Event]:
    print(f"Trying to match message: {message} with events: {[str(event)
for event in events]}")

    matched = False
    for event in events:
        if True:
            matched = True
            yield event

    if not matched:
      return



class TestMatch(unittest.TestCase):
    def test__match_event_with_message(self,):
        event_created_at = 23
        message_create_at = [5, 16]

        event_input = TimestampedValue(
            value=Event(timestamp=event_created_at),
timestamp=event_created_at
        )

        message_inputs = [
            TimestampedValue(value=Message(timestamp=timestamp),
timestamp=timestamp)
            for timestamp in message_create_at
        ]

        stream = (
            TestStream()
            .add_elements([message_inputs[0]], tag="Message")
            .advance_watermark_to(message_inputs[0].timestamp,
tag="Message")
            .add_elements([message_inputs[1]], tag="Message")
            .advance_watermark_to(message_inputs[1].timestamp,
tag="Message")
            .add_elements([event_input], tag="Event")
            .advance_watermark_to(event_input.timestamp, tag="Event")
            .advance_watermark_to_infinity(tag="Message")
        )

        with TestPipeline() as pipeline:
            input_pcolls = pipeline | "Create" >> stream

            windowed_events = input_pcolls["Event"] | "Window Events" >>
WindowInto(
                window.SlidingWindows(10, 5),
                trigger=Repeatedly(AfterCount(1)),
                accumulation_mode=AccumulationMode.ACCUMULATING,
            )
            windowed_messages = input_pcolls[
                "Message"
            ] | "Window Message Main Events" >> WindowInto(
                window.SlidingWindows(10, 5),
                trigger=Repeatedly(AfterCount(1)),
                accumulation_mode=AccumulationMode.ACCUMULATING,
            )

            output = windowed_messages | "Match Message to previous Events"
>> FlatMap(
                match_message_to_events, pvalue.AsList(windowed_events),
            )

```

In general, I have two windowed PCollections used in a streaming
pipeline and one of them serves as the side input to FlatMap. The issue I
encounter is that while running this code on Dataflow, if the window for
the side input PCollection is empty, the FlatMap step is not being executed
at all. This causes the element in the main PCollection to get stuck in the
pipeline, consequently stopping watermark progression, causing later steps
in the pipeline to break if they depend on the watermark (e.g. timers based
on the watermark time domain).

However, when executed on DirectRunner, the test produces expected output,
with empty lists as side input:
```
Trying to match message: MSG: 5 with events: []
Trying to match message: MSG: 5 with events: []
Trying to match message: MSG: 16 with events: ['Event: 23']
Trying to match message: MSG: 16 with events: []
```

Only when the job on Dataflow is drained, and watermark advanced to
infinity, Dataflow passes empty windows to the side input.

What is the suggested solution to this issue? Is there a way to force
Dataflow to pass empty windows as the side input?