You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/04/01 02:03:00 UTC

[jira] [Work logged] (BEAM-3818) Add support for the streaming side inputs in the Python DirectRunner

     [ https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=86335&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86335 ]

ASF GitHub Bot logged work on BEAM-3818:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 01/Apr/18 02:02
            Start Date: 01/Apr/18 02:02
    Worklog Time Spent: 10m 
      Work Description: mariapython commented on a change in pull request #4949: [BEAM-3818] Add support for streaming side inputs in the DirectRunner (part 2: unblock tasks as the _SideInputsContainer gets updated) 
URL: https://github.com/apache/beam/pull/4949#discussion_r178446105
 
 

 ##########
 File path: sdks/python/apache_beam/runners/direct/evaluation_context.py
 ##########
 @@ -69,67 +69,109 @@ class _SideInputsContainer(object):
   to a side input.
   """
 
-  def __init__(self, views):
+  def __init__(self, side_inputs):
     self._lock = threading.Lock()
     self._views = {}
-    self._transform_to_views = collections.defaultdict(list)
+    self._transform_to_side_inputs = collections.defaultdict(list)
+    self._side_input_to_blocked_tasks = collections.defaultdict(list)
 
-    for view in views:
-      self._views[view] = _SideInputView(view)
-      self._transform_to_views[view.pvalue.producer].append(view)
+    for side in side_inputs:
+      self._views[side] = _SideInputView(side)
+      self._transform_to_side_inputs[side.pvalue.producer].append(side)
 
   def __repr__(self):
     views_string = (', '.join(str(elm) for elm in self._views.values())
                     if self._views.values() else '[]')
     return '_SideInputsContainer(_views=%s)' % views_string
 
-  def get_value_or_schedule_after_output(self, side_input, task):
+  def get_value_or_block_until_ready(self, side_input, task, block_until):
+    """Returns the value of a view whose task is unblocked or blocks its task.
+
+    It returns the value of a view whose watermark has been updated and
+    surpasses a given value.
+
+    Args:
+      side_input: (_UnpickledSideInput) value.
+      task: (TransformExecutor) task waiting on a side input.
+      block_until: Timestamp after which the task gets unblocked.
+
+    Returns:
+      The (SideInputMap) value of a view when the tasks it blocks are unblocked
+      Otherwise, None.
+    """
     with self._lock:
       view = self._views[side_input]
-      if not view.has_result:
-        view.callable_queue.append(task)
+      if view.watermark and view.watermark.input_watermark >= block_until:
+        view.value = self._pvalue_to_value(side_input, view.elements)
+        return view.value
+      else:
+        view.blocked_tasks.append((task, block_until))
         task.blocked = True
-      return (view.has_result, view.value)
 
   def add_values(self, side_input, values):
     with self._lock:
       view = self._views[side_input]
-      assert not view.has_result
       view.elements.extend(values)
 
-  def finalize_value_and_get_tasks(self, side_input):
-    with self._lock:
-      view = self._views[side_input]
-      assert not view.has_result
-      assert view.value is None
-      assert view.callable_queue is not None
-      view.value = self._pvalue_to_value(side_input, view.elements)
-      view.elements = None
-      result = tuple(view.callable_queue)
-      for task in result:
-        task.blocked = False
-      view.callable_queue = None
-      view.has_result = True
-      return result
-
-  def update_watermarks_for_transform(self, ptransform, watermark):
-    # Collect tasks that get unblocked as the workflow progresses.
-    unblocked_tasks = []
-    for view in self._transform_to_views[ptransform]:
-      unblocked_tasks.extend(self._update_watermarks_for_view(view, watermark))
-    return unblocked_tasks
+  def update_watermarks_for_transform_and_unblock_tasks(self,
+                                                        ptransform,
+                                                        watermark):
+    """Updates _SideInputsContainer after a watermark update and unbloks tasks.
+
+    It traverses the list of side inputs per PTransform and calls
+    _update_watermarks_for_side_input_and_unblock_tasks to unblock tasks.
+
+    Args:
+      ptransform: Value of a PTransform.
+      watermark: Value of the watermark after an update for a PTransform.
 
-  def _update_watermarks_for_view(self, view, watermark):
+    Returns:
+      Tasks that get unblocked as a result of the watermark advancing.
+    """
     unblocked_tasks = []
-    if watermark.input_watermark == WatermarkManager.WATERMARK_POS_INF:
-      unblocked_tasks = self.finalize_value_and_get_tasks(view)
+    for side in self._transform_to_side_inputs[ptransform]:
+      unblocked_tasks.extend(
+          self._update_watermarks_for_side_input_and_unblock_tasks(
+              side, watermark))
     return unblocked_tasks
 
-  def _pvalue_to_value(self, view, values):
-    """Given a side input view, returns the associated value in requested form.
+  def _update_watermarks_for_side_input_and_unblock_tasks(self,
+                                                          side_input,
+                                                          watermark):
+    """Helps update _SideInputsContainer after a watermark update.
+
+    For each view of the side input, it updates the value of the watermark
+    recorded when the watermark moved and unblocks tasks accordingly.
+
+    Args:
+      side_input: (_UnpickledSideInput) value.
+      watermark: Value of the watermark after an update for a PTransform.
+
+    Returns:
+      Tasks that get unblocked as a result of the watermark advancing.
+    """
+    with self._lock:
+      unblocked_tasks = []
+      view = self._views[side_input]
+      view.watermark = watermark
+
+      view = self._views[side_input]
+      tasks_just_unblocked = []
+      for task, block_until in view.blocked_tasks:
+        if watermark.input_watermark >= block_until:
 
 Review comment:
   I think I understand, we would need a test that really took time to execute, correct? In any case, I have made a push to change: input_watermark --> output_watermark.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 86335)
    Time Spent: 9h 40m  (was: 9.5h)

> Add support for the streaming side inputs in the Python DirectRunner
> --------------------------------------------------------------------
>
>                 Key: BEAM-3818
>                 URL: https://issues.apache.org/jira/browse/BEAM-3818
>             Project: Beam
>          Issue Type: New Feature
>          Components: sdk-py-core
>            Reporter: María GH
>            Assignee: María GH
>            Priority: Minor
>             Fix For: 3.0.0
>
>          Time Spent: 9h 40m
>  Remaining Estimate: 0h
>
> The streaming DirectRunner should support streaming side input semantics.  Currently, side inputs are only available for globally-windowed side input PCollections.
> Also, empty side inputs cause a pipeline stall.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)