You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/03/22 23:25:45 UTC

[GitHub] [beam] pabloem commented on a change in pull request #13985: [BEAM-11810] A trigger manager for FnApiRunner

pabloem commented on a change in pull request #13985:
URL: https://github.com/apache/beam/pull/13985#discussion_r599139233



##########
File path: sdks/python/apache_beam/runners/portability/fn_api_runner/trigger_manager.py
##########
@@ -0,0 +1,459 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import collections
+import logging
+import typing
+from collections import defaultdict
+
+from apache_beam import typehints
+from apache_beam.coders import PickleCoder
+from apache_beam.coders import StrUtf8Coder
+from apache_beam.coders import TupleCoder
+from apache_beam.coders import VarIntCoder
+from apache_beam.coders.coders import IntervalWindowCoder
+from apache_beam.transforms import DoFn
+from apache_beam.transforms.core import Windowing
+from apache_beam.transforms.timeutil import TimeDomain
+from apache_beam.transforms.trigger import AccumulationMode
+from apache_beam.transforms.trigger import TriggerContext
+from apache_beam.transforms.trigger import _CombiningValueStateTag
+from apache_beam.transforms.trigger import _StateTag
+from apache_beam.transforms.userstate import AccumulatingRuntimeState
+from apache_beam.transforms.userstate import BagRuntimeState
+from apache_beam.transforms.userstate import BagStateSpec
+from apache_beam.transforms.userstate import CombiningValueStateSpec
+from apache_beam.transforms.userstate import RuntimeTimer
+from apache_beam.transforms.userstate import SetRuntimeState
+from apache_beam.transforms.userstate import SetStateSpec
+from apache_beam.transforms.userstate import TimerSpec
+from apache_beam.transforms.userstate import on_timer
+from apache_beam.transforms.window import BoundedWindow
+from apache_beam.transforms.window import GlobalWindow
+from apache_beam.transforms.window import Sessions
+from apache_beam.transforms.window import TimestampedValue
+from apache_beam.transforms.window import WindowFn
+from apache_beam.typehints import TypeCheckError
+from apache_beam.utils import windowed_value
+from apache_beam.utils.timestamp import MIN_TIMESTAMP
+from apache_beam.utils.timestamp import Timestamp
+from apache_beam.utils.windowed_value import WindowedValue
+
+_LOGGER = logging.getLogger(__name__)
+_LOGGER.setLevel(logging.DEBUG)
+
+K = typing.TypeVar('K')
+V = typing.TypeVar('V')
+
+
+class ReifyWindows(DoFn):
+  """Receives KV pairs, and wraps the values into WindowedValues."""
+  def process(
+      self, element, window=DoFn.WindowParam, timestamp=DoFn.TimestampParam):
+    try:
+      k, v = element
+    except TypeError:
+      raise TypeCheckError(
+          'Input to GroupByKey must be a PCollection with '
+          'elements compatible with KV[A, B]')
+
+    yield (k, windowed_value.WindowedValue(v, timestamp, [window]))
+
+
+class _GroupBundlesByKey(DoFn):
+  def start_bundle(self):
+    self.keys = defaultdict(list)
+
+  def process(self, element):
+    key, windowed_value = element
+    self.keys[key].append(windowed_value)
+
+  def finish_bundle(self):
+    for k, vals in self.keys.items():
+      yield windowed_value.WindowedValue((k, vals),
+                                         MIN_TIMESTAMP, [GlobalWindow()])
+
+
+def read_watermark(watermark_state):
+  try:
+    return watermark_state.read()
+  except ValueError:
+    watermark_state.add(MIN_TIMESTAMP)
+    return watermark_state.read()
+
+
+class TriggerMergeContext(WindowFn.MergeContext):
+  def __init__(
+      self, all_windows, context: 'FnRunnerStatefulTriggerContext', windowing):

Review comment:
       we use windowfn in a few places (e.g. when merging two windows)

##########
File path: sdks/python/apache_beam/runners/portability/fn_api_runner/trigger_manager.py
##########
@@ -0,0 +1,459 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import collections
+import logging
+import typing
+from collections import defaultdict
+
+from apache_beam import typehints
+from apache_beam.coders import PickleCoder
+from apache_beam.coders import StrUtf8Coder
+from apache_beam.coders import TupleCoder
+from apache_beam.coders import VarIntCoder
+from apache_beam.coders.coders import IntervalWindowCoder
+from apache_beam.transforms import DoFn
+from apache_beam.transforms.core import Windowing
+from apache_beam.transforms.timeutil import TimeDomain
+from apache_beam.transforms.trigger import AccumulationMode
+from apache_beam.transforms.trigger import TriggerContext
+from apache_beam.transforms.trigger import _CombiningValueStateTag
+from apache_beam.transforms.trigger import _StateTag
+from apache_beam.transforms.userstate import AccumulatingRuntimeState
+from apache_beam.transforms.userstate import BagRuntimeState
+from apache_beam.transforms.userstate import BagStateSpec
+from apache_beam.transforms.userstate import CombiningValueStateSpec
+from apache_beam.transforms.userstate import RuntimeTimer
+from apache_beam.transforms.userstate import SetRuntimeState
+from apache_beam.transforms.userstate import SetStateSpec
+from apache_beam.transforms.userstate import TimerSpec
+from apache_beam.transforms.userstate import on_timer
+from apache_beam.transforms.window import BoundedWindow
+from apache_beam.transforms.window import GlobalWindow
+from apache_beam.transforms.window import Sessions
+from apache_beam.transforms.window import TimestampedValue
+from apache_beam.transforms.window import WindowFn
+from apache_beam.typehints import TypeCheckError
+from apache_beam.utils import windowed_value
+from apache_beam.utils.timestamp import MIN_TIMESTAMP
+from apache_beam.utils.timestamp import Timestamp
+from apache_beam.utils.windowed_value import WindowedValue
+
+_LOGGER = logging.getLogger(__name__)
+_LOGGER.setLevel(logging.DEBUG)
+
+K = typing.TypeVar('K')
+V = typing.TypeVar('V')
+
+
+class ReifyWindows(DoFn):
+  """Receives KV pairs, and wraps the values into WindowedValues."""
+  def process(
+      self, element, window=DoFn.WindowParam, timestamp=DoFn.TimestampParam):
+    try:
+      k, v = element
+    except TypeError:
+      raise TypeCheckError(
+          'Input to GroupByKey must be a PCollection with '
+          'elements compatible with KV[A, B]')
+
+    yield (k, windowed_value.WindowedValue(v, timestamp, [window]))
+
+
+class _GroupBundlesByKey(DoFn):
+  def start_bundle(self):
+    self.keys = defaultdict(list)
+
+  def process(self, element):
+    key, windowed_value = element
+    self.keys[key].append(windowed_value)
+
+  def finish_bundle(self):
+    for k, vals in self.keys.items():
+      yield windowed_value.WindowedValue((k, vals),
+                                         MIN_TIMESTAMP, [GlobalWindow()])
+
+
+def read_watermark(watermark_state):
+  try:
+    return watermark_state.read()
+  except ValueError:
+    watermark_state.add(MIN_TIMESTAMP)
+    return watermark_state.read()
+
+
+class TriggerMergeContext(WindowFn.MergeContext):
+  def __init__(
+      self, all_windows, context: 'FnRunnerStatefulTriggerContext', windowing):
+    super(TriggerMergeContext, self).__init__(all_windows)
+    self.trigger_context = context
+    self.windowing = windowing
+    self.merged_away: typing.Dict[BoundedWindow, BoundedWindow] = {}
+
+  def merge(self, to_be_merged, merge_result):
+    _LOGGER.debug("Merging %s into %s", to_be_merged, merge_result)
+    self.trigger_context.merge_state(to_be_merged, merge_result)
+    for window in to_be_merged:
+      if window != merge_result:
+        self.merged_away[window] = merge_result
+        # Clear state associated with PaneInfo since it is
+        # not preserved across merges.
+        self.trigger_context.for_window(window).clear_state(None)
+    self.windowing.triggerfn.on_merge(
+        to_be_merged,
+        merge_result,
+        self.trigger_context.for_window(merge_result))
+
+
+@typehints.with_input_types(
+    typing.Tuple[K, typing.Iterable[windowed_value.WindowedValue]])
+@typehints.with_output_types(
+    typing.Tuple[K, typing.Iterable[windowed_value.WindowedValue]])
+class GeneralTriggerManagerDoFn(DoFn):
+  """A trigger manager that supports all windowing / triggering cases.
+
+  This implements a DoFn that manages triggering in a per-key basis. All
+  elements for a single key are processed together. Per-key state holds data
+  related to all windows.
+  """
+
+  KNOWN_WINDOWS = SetStateSpec('known_windows', IntervalWindowCoder())
+  FINISHED_WINDOWS = SetStateSpec('finished_windows', IntervalWindowCoder())
+  LAST_KNOWN_TIME = CombiningValueStateSpec('last_known_time', combine_fn=max)
+  LAST_KNOWN_WATERMARK = CombiningValueStateSpec(
+      'last_known_watermark', combine_fn=max)
+
+  # TODO(pabloem) What's the coder for the elements/keys here?
+  WINDOW_ELEMENT_PAIRS = BagStateSpec(
+      'element_bag', TupleCoder([IntervalWindowCoder(), PickleCoder()]))
+  WINDOW_TAG_VALUES = BagStateSpec(
+      'per_window_per_tag_value_state',
+      TupleCoder([IntervalWindowCoder(), StrUtf8Coder(), VarIntCoder()]))
+
+  PROCESSING_TIME_TIMER = TimerSpec(
+      'processing_time_timer', TimeDomain.REAL_TIME)
+  WATERMARK_TIMER = TimerSpec('watermark_timer', TimeDomain.WATERMARK)
+
+  def __init__(self, windowing: Windowing):
+    self.windowing = windowing
+    # Only session windows are merging. Other windows are non-merging.
+    self.merging_windows = isinstance(windowing.windowfn, Sessions)

Review comment:
       done.

##########
File path: sdks/python/apache_beam/runners/portability/fn_api_runner/trigger_manager.py
##########
@@ -0,0 +1,459 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import collections
+import logging
+import typing
+from collections import defaultdict
+
+from apache_beam import typehints
+from apache_beam.coders import PickleCoder
+from apache_beam.coders import StrUtf8Coder
+from apache_beam.coders import TupleCoder
+from apache_beam.coders import VarIntCoder
+from apache_beam.coders.coders import IntervalWindowCoder
+from apache_beam.transforms import DoFn
+from apache_beam.transforms.core import Windowing
+from apache_beam.transforms.timeutil import TimeDomain
+from apache_beam.transforms.trigger import AccumulationMode
+from apache_beam.transforms.trigger import TriggerContext
+from apache_beam.transforms.trigger import _CombiningValueStateTag
+from apache_beam.transforms.trigger import _StateTag
+from apache_beam.transforms.userstate import AccumulatingRuntimeState
+from apache_beam.transforms.userstate import BagRuntimeState
+from apache_beam.transforms.userstate import BagStateSpec
+from apache_beam.transforms.userstate import CombiningValueStateSpec
+from apache_beam.transforms.userstate import RuntimeTimer
+from apache_beam.transforms.userstate import SetRuntimeState
+from apache_beam.transforms.userstate import SetStateSpec
+from apache_beam.transforms.userstate import TimerSpec
+from apache_beam.transforms.userstate import on_timer
+from apache_beam.transforms.window import BoundedWindow
+from apache_beam.transforms.window import GlobalWindow
+from apache_beam.transforms.window import Sessions
+from apache_beam.transforms.window import TimestampedValue
+from apache_beam.transforms.window import WindowFn
+from apache_beam.typehints import TypeCheckError
+from apache_beam.utils import windowed_value
+from apache_beam.utils.timestamp import MIN_TIMESTAMP
+from apache_beam.utils.timestamp import Timestamp
+from apache_beam.utils.windowed_value import WindowedValue
+
+_LOGGER = logging.getLogger(__name__)
+_LOGGER.setLevel(logging.DEBUG)
+
+K = typing.TypeVar('K')
+V = typing.TypeVar('V')
+
+
+class ReifyWindows(DoFn):
+  """Receives KV pairs, and wraps the values into WindowedValues."""
+  def process(
+      self, element, window=DoFn.WindowParam, timestamp=DoFn.TimestampParam):
+    try:
+      k, v = element
+    except TypeError:
+      raise TypeCheckError(
+          'Input to GroupByKey must be a PCollection with '
+          'elements compatible with KV[A, B]')
+
+    yield (k, windowed_value.WindowedValue(v, timestamp, [window]))
+
+
+class _GroupBundlesByKey(DoFn):
+  def start_bundle(self):
+    self.keys = defaultdict(list)
+
+  def process(self, element):
+    key, windowed_value = element
+    self.keys[key].append(windowed_value)
+
+  def finish_bundle(self):
+    for k, vals in self.keys.items():
+      yield windowed_value.WindowedValue((k, vals),
+                                         MIN_TIMESTAMP, [GlobalWindow()])
+
+
+def read_watermark(watermark_state):
+  try:
+    return watermark_state.read()
+  except ValueError:
+    watermark_state.add(MIN_TIMESTAMP)
+    return watermark_state.read()
+
+
+class TriggerMergeContext(WindowFn.MergeContext):
+  def __init__(
+      self, all_windows, context: 'FnRunnerStatefulTriggerContext', windowing):
+    super(TriggerMergeContext, self).__init__(all_windows)
+    self.trigger_context = context
+    self.windowing = windowing
+    self.merged_away: typing.Dict[BoundedWindow, BoundedWindow] = {}
+
+  def merge(self, to_be_merged, merge_result):
+    _LOGGER.debug("Merging %s into %s", to_be_merged, merge_result)
+    self.trigger_context.merge_state(to_be_merged, merge_result)
+    for window in to_be_merged:
+      if window != merge_result:
+        self.merged_away[window] = merge_result
+        # Clear state associated with PaneInfo since it is
+        # not preserved across merges.
+        self.trigger_context.for_window(window).clear_state(None)
+    self.windowing.triggerfn.on_merge(
+        to_be_merged,
+        merge_result,
+        self.trigger_context.for_window(merge_result))
+
+
+@typehints.with_input_types(
+    typing.Tuple[K, typing.Iterable[windowed_value.WindowedValue]])
+@typehints.with_output_types(
+    typing.Tuple[K, typing.Iterable[windowed_value.WindowedValue]])
+class GeneralTriggerManagerDoFn(DoFn):
+  """A trigger manager that supports all windowing / triggering cases.
+
+  This implements a DoFn that manages triggering in a per-key basis. All
+  elements for a single key are processed together. Per-key state holds data
+  related to all windows.
+  """
+
+  KNOWN_WINDOWS = SetStateSpec('known_windows', IntervalWindowCoder())
+  FINISHED_WINDOWS = SetStateSpec('finished_windows', IntervalWindowCoder())
+  LAST_KNOWN_TIME = CombiningValueStateSpec('last_known_time', combine_fn=max)
+  LAST_KNOWN_WATERMARK = CombiningValueStateSpec(
+      'last_known_watermark', combine_fn=max)
+
+  # TODO(pabloem) What's the coder for the elements/keys here?
+  WINDOW_ELEMENT_PAIRS = BagStateSpec(
+      'element_bag', TupleCoder([IntervalWindowCoder(), PickleCoder()]))
+  WINDOW_TAG_VALUES = BagStateSpec(
+      'per_window_per_tag_value_state',
+      TupleCoder([IntervalWindowCoder(), StrUtf8Coder(), VarIntCoder()]))
+
+  PROCESSING_TIME_TIMER = TimerSpec(
+      'processing_time_timer', TimeDomain.REAL_TIME)
+  WATERMARK_TIMER = TimerSpec('watermark_timer', TimeDomain.WATERMARK)
+
+  def __init__(self, windowing: Windowing):
+    self.windowing = windowing
+    # Only session windows are merging. Other windows are non-merging.
+    self.merging_windows = isinstance(windowing.windowfn, Sessions)
+
+  def process(
+      self,
+      element: typing.Tuple[K, typing.Iterable[windowed_value.WindowedValue]],
+      element_bag: BagRuntimeState = DoFn.StateParam(WINDOW_ELEMENT_PAIRS),  # type: ignore
+      time_state: AccumulatingRuntimeState = DoFn.StateParam(LAST_KNOWN_TIME),  # type: ignore
+      watermark_state: AccumulatingRuntimeState = DoFn.StateParam(  # type: ignore
+          LAST_KNOWN_WATERMARK),
+      window_tag_values: BagRuntimeState = DoFn.StateParam(WINDOW_TAG_VALUES),  # type: ignore
+      windows_state: SetRuntimeState = DoFn.StateParam(KNOWN_WINDOWS),  # type: ignore
+      finished_windows_state: SetRuntimeState = DoFn.StateParam(  # type: ignore
+          FINISHED_WINDOWS),
+      processing_time_timer=DoFn.TimerParam(PROCESSING_TIME_TIMER),
+      watermark_timer=DoFn.TimerParam(WATERMARK_TIMER),
+      *args, **kwargs):
+    context = FnRunnerStatefulTriggerContext(
+        processing_time_timer=processing_time_timer,
+        watermark_timer=watermark_timer,
+        current_time_state=time_state,
+        watermark_state=watermark_state,
+        elements_bag_state=element_bag,
+        window_tag_values_bag_state=window_tag_values,
+        finished_windows_state=finished_windows_state)
+    key, windowed_values = element
+    watermark = read_watermark(watermark_state)
+
+    windows_to_elements = collections.defaultdict(list)
+    for wv in windowed_values:
+      for window in wv.windows:
+        # ignore expired windows
+        if watermark > window.end + self.windowing.allowed_lateness:
+          continue
+        if window in finished_windows_state.read():
+          continue
+        windows_to_elements[window].append(
+            TimestampedValue(wv.value, wv.timestamp))
+
+    # Processing merging of windows
+    if self.merging_windows:
+      old_windows = set(windows_state.read())
+      all_windows = old_windows.union(list(windows_to_elements))
+      if all_windows != old_windows:
+        merge_context = TriggerMergeContext(
+            all_windows, context, self.windowing)
+        self.windowing.windowfn.merge(merge_context)
+
+        merged_windows_to_elements = collections.defaultdict(list)
+        for window, values in windows_to_elements.items():
+          while window in merge_context.merged_away:
+            window = merge_context.merged_away[window]
+          merged_windows_to_elements[window].extend(values)
+        windows_to_elements = merged_windows_to_elements
+
+      if old_windows != all_windows:
+        for w in windows_to_elements:
+          windows_state.add(w)
+    # Done processing merging of windows
+
+    seen_windows = set()
+    for w in windows_to_elements:
+      window_context = context.for_window(w)
+      seen_windows.add(w)
+      for value_w_timestamp in windows_to_elements[w]:
+        _LOGGER.debug(value_w_timestamp)
+        element_bag.add((w, value_w_timestamp))
+        self.windowing.triggerfn.on_element(windowed_values, w, window_context)
+
+    return self._trigger_fire(
+        key, TimeDomain.WATERMARK, watermark, None, context, seen_windows)
+
+  def _trigger_fire(
+      self,
+      key: K,
+      time_domain,
+      timestamp: Timestamp,
+      timer_tag: typing.Optional[str],
+      context: 'FnRunnerStatefulTriggerContext',
+      windows_of_interest: typing.Optional[typing.Set[BoundedWindow]] = None):
+    windows_to_elements = context.windows_to_elements_map()
+    context.elements_bag_state.clear()
+
+    fired_windows = set()
+    finished_windows = set()
+    _LOGGER.debug(
+        '%s - tag %s - timestamp %s', time_domain, timer_tag, timestamp)
+    for w, elems in windows_to_elements.items():
+      if windows_of_interest is not None and w not in windows_of_interest:
+        # windows_of_interest=None means that we care about all windows.
+        # If we care only about some windows, and this window is not one of
+        # them, then we do not intend to fire this window.
+        continue
+      window_context = context.for_window(w)
+      if self.windowing.triggerfn.should_fire(time_domain,
+                                              timestamp,
+                                              w,
+                                              window_context):
+        finished = self.windowing.triggerfn.on_fire(
+            timestamp, w, window_context)
+        _LOGGER.debug('Firing on window %s. Finished: %s', w, finished)
+        fired_windows.add(w)
+        if finished:
+          context.finished_windows_state.add(w)
+          finished_windows.add(w)

Review comment:
       done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org