You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/04/18 17:21:55 UTC

[GitHub] [beam] TheNeuralBit opened a new pull request, #17384: [WIP][BEAM-14294] Worker changes to support trivial Batched DoFns

TheNeuralBit opened a new pull request, #17384:
URL: https://github.com/apache/beam/pull/17384

   WIP because this is building on unmerged PR #17253 
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] TheNeuralBit commented on a diff in pull request #17384: [BEAM-14294] Worker changes to support trivial Batched DoFns

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on code in PR #17384:
URL: https://github.com/apache/beam/pull/17384#discussion_r864047641


##########
sdks/python/apache_beam/utils/windowed_value.pxd:
##########
@@ -43,6 +43,23 @@ cdef class WindowedValue(object):
 
   cpdef WindowedValue with_value(self, new_value)
 
+cdef class WindowedBatch(object):
+  cpdef WindowedBatch with_values(self, object new_values)
+
+cdef class HomogeneousWindowedBatch(WindowedBatch):
+  cdef public WindowedValue _wv
+
+  cpdef WindowedBatch with_values(self, object new_values)
+
+cdef class ConcreteWindowedBatch(WindowedBatch):
+  cdef public object values

Review Comment:
   yeah the WindowedValue(None) approach could make sense. The downside is it's not "columnar", but for that matter neither is the current implementation based on python lists. An optimized version could be yet another future WindowedBatch implementation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] TheNeuralBit commented on a diff in pull request #17384: [BEAM-14294] Worker changes to support trivial Batched DoFns

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on code in PR #17384:
URL: https://github.com/apache/beam/pull/17384#discussion_r863261750


##########
sdks/python/apache_beam/utils/windowed_value.py:
##########
@@ -279,6 +293,208 @@ def create(value, timestamp_micros, windows, pane_info=PANE_INFO_UNKNOWN):
   return wv
 
 
+class BatchingMode(Enum):
+  CONCRETE = 1
+  HOMOGENEOUS = 2
+
+
+class WindowedBatch(object):
+  """A batch of N windowed values, each having a value, a timestamp and set of
+  windows."""
+  def with_values(self, new_values):
+    # type: (Any) -> WindowedBatch
+
+    """Creates a new WindowedBatch with the same timestamps and windows as this.
+
+    This is the fasted way to create a new WindowedValue.
+    """
+    raise NotImplementedError
+
+  def as_windowed_values(self, explode_fn: Callable) -> Iterable[WindowedValue]:
+    raise NotImplementedError
+
+  @staticmethod
+  def from_windowed_values(
+      windowed_values: Sequence[WindowedValue],
+      *,
+      produce_fn: Callable,
+      mode: BatchingMode = BatchingMode.CONCRETE) -> Iterable['WindowedBatch']:
+    if mode == BatchingMode.HOMOGENEOUS:
+      import collections
+      grouped = collections.defaultdict(lambda: [])
+      for wv in windowed_values:
+        grouped[(wv.timestamp, tuple(wv.windows),
+                 wv.pane_info)].append(wv.value)
+
+      for key, values in grouped.items():
+        timestamp, windows, pane_info = key
+        yield HomogeneousWindowedBatch.of(
+            produce_fn(values), timestamp, windows, pane_info)
+    elif mode == BatchingMode.CONCRETE:
+      yield ConcreteWindowedBatch(
+          produce_fn([wv.value for wv in windowed_values]),
+          [wv.timestamp
+           for wv in windowed_values], [wv.windows for wv in windowed_values],
+          [wv.pane_info for wv in windowed_values])
+    else:
+      raise AssertionError(
+          "Unrecognized BatchingMode in "
+          f"WindowedBatch.from_windowed_values: {mode!r}")
+
+
+class HomogeneousWindowedBatch(WindowedBatch):
+  """A WindowedBatch with Homogeneous event-time information, represented
+  internally as a WindowedValue.
+  """
+  def __init__(self, wv):
+    self._wv = wv
+
+  @staticmethod
+  def of(values, timestamp, windows, pane_info=PANE_INFO_UNKNOWN):
+    return HomogeneousWindowedBatch(
+        WindowedValue(values, timestamp, windows, pane_info))
+
+  @property
+  def values(self):
+    return self._wv.value
+
+  @property
+  def timestamp(self):
+    return self._wv.timestamp
+
+  @property
+  def pane_info(self):
+    return self._wv.pane_info
+
+  @property
+  def windows(self):
+    return self._wv.windows
+
+  @windows.setter

Review Comment:
   I know I added this in response to some test failure... but I can't recall what, and now removing it doesn't seem to produce an error. Probably this was needed in some previous iteration of PerWindowInvoker. I'll drop it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] TheNeuralBit commented on a diff in pull request #17384: [BEAM-14294] Worker changes to support trivial Batched DoFns

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on code in PR #17384:
URL: https://github.com/apache/beam/pull/17384#discussion_r863282859


##########
sdks/python/apache_beam/runners/common.py:
##########
@@ -1200,6 +1404,13 @@ def process(self, windowed_value):
       self._reraise_augmented(exn)
       return []
 
+  def process_batch(self, windowed_batch):

Review Comment:
   Just to make it clear that the producer is responsible for handling any type conversions?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] TheNeuralBit commented on a diff in pull request #17384: [BEAM-14294] Worker changes to support trivial Batched DoFns

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on code in PR #17384:
URL: https://github.com/apache/beam/pull/17384#discussion_r864288357


##########
sdks/python/apache_beam/runners/worker/operations.py:
##########
@@ -223,6 +235,120 @@ def current_element_progress(self):
     return self.consumer.current_element_progress()
 
 
+class GeneralPurposeConsumerSet(ConsumerSet):
+  """ConsumerSet implementation that handles all combinations of possible edges.
+  """
+  def __init__(self,
+               counter_factory,
+               step_name,  # type: str
+               output_index,
+               coder,
+               producer_type_hints,
+               consumers,  # type: List[Operation]
+               producer_batch_converter):
+    super().__init__(
+        counter_factory,
+        step_name,
+        output_index,
+        consumers,
+        coder,
+        producer_type_hints)
+
+    self.producer_batch_converter = producer_batch_converter
+
+    # Partition consumers into three groups:
+    # - consumers that will be passed elements
+    # - consumers that will be passed batches (where their input batch type
+    #   matches the output of the producer)
+    # - consumers that will be passed converted batches
+    self.element_consumers: List[Operation] = []
+    self.passthrough_batch_consumers: List[Operation] = []
+    other_batch_consumers: DefaultDict[
+        BatchConverter, List[Operation]] = collections.defaultdict(lambda: [])
+
+    for consumer in consumers:
+      if not consumer.get_batching_preference().supports_batches:
+        self.element_consumers.append(consumer)
+      elif (consumer.get_input_batch_converter() ==
+            self.producer_batch_converter):
+        self.passthrough_batch_consumers.append(consumer)
+      else:
+        # Batch consumer with a mismatched batch type
+        if consumer.get_batching_preference().supports_elements:
+          # Pass it elements if we can
+          self.element_consumers.append(consumer)
+        else:
+          # As a last resort, explode and rebatch
+          consumer_batch_converter = consumer.get_input_batch_converter()
+          # This consumer supports batches, it must have a batch converter
+          assert consumer_batch_converter is not None
+          other_batch_consumers[consumer_batch_converter].append(consumer)
+
+    self.other_batch_consumers: Dict[BatchConverter, List[Operation]] = dict(
+        other_batch_consumers)
+
+    self.has_batch_consumers = (
+        self.passthrough_batch_consumers or self.other_batch_consumers)
+    self._batched_elements: List[Any] = []
+
+  def receive(self, windowed_value):
+    # type: (WindowedValue) -> None
+    self.update_counters_start(windowed_value)
+
+    for consumer in self.element_consumers:
+      cython.cast(Operation, consumer).process(windowed_value)
+
+    # TODO: Do this branching when contstructing ConsumerSet
+    if self.has_batch_consumers:
+      self._batched_elements.append(windowed_value)
+
+    self.update_counters_finish()
+
+  def receive_batch(self, windowed_batch):
+    #self.update_counters_start(windowed_value)

Review Comment:
   Done. I added an update_counters_batch that just counts elements and call it at the end of receive_batch. Does that seem reasonable?
   
   Also dropped a TODO for tracking byte size.
   
   Remaining work here:
   - [ ] Right now this code is exercised in the `test_batch_*` tests, but that doesn't actually verify it. I should at least add a test to exercise it in opcounters_test
   - [ ] File a jira and update the TODO-XXX



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] robertwb commented on a diff in pull request #17384: [BEAM-14294] Worker changes to support trivial Batched DoFns

Posted by GitBox <gi...@apache.org>.
robertwb commented on code in PR #17384:
URL: https://github.com/apache/beam/pull/17384#discussion_r863277468


##########
sdks/python/apache_beam/utils/windowed_value.pxd:
##########
@@ -43,6 +43,23 @@ cdef class WindowedValue(object):
 
   cpdef WindowedValue with_value(self, new_value)
 
+cdef class WindowedBatch(object):
+  cpdef WindowedBatch with_values(self, object new_values)
+
+cdef class HomogeneousWindowedBatch(WindowedBatch):
+  cdef public WindowedValue _wv

Review Comment:
   Yeah, it doesn't know the type of other. I suppose making this public isn't any worse than plain-vanilla Python. (The alternative is to cast, but that is less clean with our hybrid works-in-pure-and-compiled mode.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] TheNeuralBit commented on a diff in pull request #17384: [BEAM-14294] Worker changes to support trivial Batched DoFns

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on code in PR #17384:
URL: https://github.com/apache/beam/pull/17384#discussion_r864048616


##########
sdks/python/apache_beam/utils/windowed_value.py:
##########
@@ -279,6 +293,208 @@ def create(value, timestamp_micros, windows, pane_info=PANE_INFO_UNKNOWN):
   return wv
 
 
+class BatchingMode(Enum):
+  CONCRETE = 1
+  HOMOGENEOUS = 2
+
+
+class WindowedBatch(object):
+  """A batch of N windowed values, each having a value, a timestamp and set of
+  windows."""
+  def with_values(self, new_values):
+    # type: (Any) -> WindowedBatch
+
+    """Creates a new WindowedBatch with the same timestamps and windows as this.
+
+    This is the fasted way to create a new WindowedValue.
+    """
+    raise NotImplementedError
+
+  def as_windowed_values(self, explode_fn: Callable) -> Iterable[WindowedValue]:
+    raise NotImplementedError
+
+  @staticmethod
+  def from_windowed_values(
+      windowed_values: Sequence[WindowedValue],
+      *,
+      produce_fn: Callable,
+      mode: BatchingMode = BatchingMode.CONCRETE) -> Iterable['WindowedBatch']:
+    if mode == BatchingMode.HOMOGENEOUS:
+      import collections
+      grouped = collections.defaultdict(lambda: [])
+      for wv in windowed_values:
+        grouped[(wv.timestamp, tuple(wv.windows),
+                 wv.pane_info)].append(wv.value)
+
+      for key, values in grouped.items():
+        timestamp, windows, pane_info = key
+        yield HomogeneousWindowedBatch.of(
+            produce_fn(values), timestamp, windows, pane_info)
+    elif mode == BatchingMode.CONCRETE:
+      yield ConcreteWindowedBatch(
+          produce_fn([wv.value for wv in windowed_values]),
+          [wv.timestamp
+           for wv in windowed_values], [wv.windows for wv in windowed_values],
+          [wv.pane_info for wv in windowed_values])
+    else:
+      raise AssertionError(
+          "Unrecognized BatchingMode in "
+          f"WindowedBatch.from_windowed_values: {mode!r}")
+
+
+class HomogeneousWindowedBatch(WindowedBatch):
+  """A WindowedBatch with Homogeneous event-time information, represented
+  internally as a WindowedValue.
+  """
+  def __init__(self, wv):
+    self._wv = wv
+
+  @staticmethod
+  def of(values, timestamp, windows, pane_info=PANE_INFO_UNKNOWN):
+    return HomogeneousWindowedBatch(
+        WindowedValue(values, timestamp, windows, pane_info))
+
+  @property
+  def values(self):
+    return self._wv.value
+
+  @property
+  def timestamp(self):
+    return self._wv.timestamp
+
+  @property
+  def pane_info(self):
+    return self._wv.pane_info
+
+  @property
+  def windows(self):
+    return self._wv.windows
+
+  @windows.setter
+  def windows(self, value):
+    self._wv.windows = value
+
+  def with_values(self, new_values):
+    # type: (Any) -> WindowedBatch
+    return HomogeneousWindowedBatch(self._wv.with_value(new_values))
+
+  def as_windowed_values(self, explode_fn: Callable) -> Iterable[WindowedValue]:
+    for value in explode_fn(self._wv.value):
+      yield self._wv.with_value(value)
+
+  def __eq__(self, other):
+    if isinstance(other, HomogeneousWindowedBatch):
+      return self._wv == other._wv
+    return NotImplemented
+
+  def __hash__(self):
+    return hash(self._wv)
+
+
+class ConcreteWindowedBatch(WindowedBatch):
+  """A concrete WindowedBatch where all event-time information is stored
+  independently for each element.
+
+  Attributes:
+    values: The underlying values of the windowed batch.
+    timestamp: An iterable of timestamps associated with the value as seconds
+      since Unix epoch.
+    windows: An iterable with a set (iterable) of window objects for each value.
+      The window objects are descendants of the BoundedWindow class.
+    pane_info: An iterable of PaneInfo descriptors describing the triggering
+      information for the pane that contained each value. Alternatively, a
+      single PaneInfo may be specified to use for every value. If None, will be
+      set to PANE_INFO_UNKNOWN.
+  """
+  def __init__(
+      self,
+      values,
+      timestamps,  # type: Sequence[TimestampTypes]
+      windows,  # type: Iterable[Tuple[BoundedWindow, ...]]
+      pane_infos=PANE_INFO_UNKNOWN  # type: Union[Iterable[PaneInfo],PaneInfo]
+  ):
+    self.values = values
+
+    def convert_timestamp(timestamp: TimestampTypes) -> int:
+      if isinstance(timestamp, int):
+        return timestamp * 1000000
+      else:
+        # TODO: Cache Timestamp object as in WindowedValue?
+        timestamp_object = (
+            timestamp
+            if isinstance(timestamp, Timestamp) else Timestamp.of(timestamp))
+        return timestamp_object.micros
+
+    self.timestamp_objects: Optional[List[Timestamp]] = None
+    self.timestamps_micros = [convert_timestamp(t) for t in timestamps]
+    self.windows = windows
+    #TODO: Should we store length?
+    #self.length = length
+    self.pane_infos = pane_infos
+
+  @property
+  def timestamps(self) -> Sequence[Timestamp]:
+    if self.timestamp_objects is None:
+      self.timestamp_objects = [
+          Timestamp(0, micros) for micros in self.timestamps_micros
+      ]
+
+    return self.timestamp_objects
+
+  def with_values(self, new_values):

Review Comment:
   Removed along with all of ConcreteWindowedBatch



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] TheNeuralBit commented on pull request #17384: [BEAM-14294] Worker changes to support trivial Batched DoFns

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on PR #17384:
URL: https://github.com/apache/beam/pull/17384#issuecomment-1117994023

   Portable_Python PreCommit has passed, but is not reflected in the GitHub UI: https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Commit/22208/
   
   I'll go ahead and merge.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] TheNeuralBit commented on a diff in pull request #17384: [BEAM-14294] Worker changes to support trivial Batched DoFns

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on code in PR #17384:
URL: https://github.com/apache/beam/pull/17384#discussion_r864046412


##########
sdks/python/apache_beam/utils/windowed_value.pxd:
##########
@@ -43,6 +43,23 @@ cdef class WindowedValue(object):
 
   cpdef WindowedValue with_value(self, new_value)
 
+cdef class WindowedBatch(object):
+  cpdef WindowedBatch with_values(self, object new_values)
+
+cdef class HomogeneousWindowedBatch(WindowedBatch):
+  cdef public WindowedValue _wv
+
+  cpdef WindowedBatch with_values(self, object new_values)
+
+cdef class ConcreteWindowedBatch(WindowedBatch):

Review Comment:
   This was removed now, will rename to Heterogeneous when it's added back.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] TheNeuralBit commented on pull request #17384: [BEAM-14294] Worker changes to support trivial Batched DoFns

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on PR #17384:
URL: https://github.com/apache/beam/pull/17384#issuecomment-1117595689

   It looks like its a pre-existing issue related to passing a large input (order 1000 elements) through test_batch_pardo in order to trigger a flush. I can reproduce the flake in a pipeline that doesn't have a batched DoFn:
   
   ```
     def test_large_input_pardo(self):                                                                                                                                                                                               
       with self.create_pipeline() as p:                                           
         res = (                                                                   
             p                                                                     
             | beam.Create(np.array(range(5000), dtype=np.int64)).with_output_types(np.int64)
             | beam.Map(lambda e: e * 2)                                           
             | beam.Map(lambda e: e + 3))                                          
         assert_that(res, equal_to([(i * 2) + 3 for i in range(5000)]))   
   ```
   
   It's worth noting I haven't seen the flake in a cython environment. Perhaps it's a bug in the "slow" stack which is often only tested with trivial inputs?
   
   I don't think I'll be able to diagnose the existing issue today. A solution for this PR might be to skip the flush-triggering test in non-cythonized environments.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] TheNeuralBit commented on a diff in pull request #17384: [BEAM-14294] Worker changes to support trivial Batched DoFns

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on code in PR #17384:
URL: https://github.com/apache/beam/pull/17384#discussion_r863285586


##########
sdks/python/apache_beam/runners/common.py:
##########
@@ -1361,10 +1580,80 @@ def process_outputs(
         self.main_receivers.receive(windowed_value)
       else:
         self.tagged_receivers[tag].receive(windowed_value)
+
+    # TODO(BEAM-3937): Remove if block after output counter released.
+    # Only enable per_element_output_counter when counter cythonized
+    if self.per_element_output_counter is not None:
+      self.per_element_output_counter.add_input(output_element_count)
+
+  def process_batch_outputs(
+      self, windowed_input_batch, results, watermark_estimator=None):
+    # type: (WindowedValue, Iterable[Any], Optional[WatermarkEstimator]) -> None
+
+    """Dispatch the result of process computation to the appropriate receivers.
+
+    A value wrapped in a TaggedOutput object will be unwrapped and
+    then dispatched to the appropriate indexed output.
+    """
+    if results is None:
+      # TODO(BEAM-3937): Remove if block after output counter released.
+      # Only enable per_element_output_counter when counter cythonized.
+      if self.per_element_output_counter is not None:
+        self.per_element_output_counter.add_input(0)
+      return
+
+    # TODO(BEAM-10782): Verify that the results object is a valid iterable type
+    #  if performance_runtime_type_check is active, without harming performance
+
+    output_element_count = 0
+    for result in results:
+      # results here may be a generator, which cannot call len on it.
+      output_element_count += 1
+      tag = None
+      if isinstance(result, TaggedOutput):
+        tag = result.tag
+        if not isinstance(tag, str):
+          raise TypeError('In %s, tag %s is not a string' % (self, tag))
+        result = result.value
+      if isinstance(result, (WindowedValue, TimestampedValue)):
+        raise TypeError(
+            f"Received {type(result).__name__} from DoFn that was "
+            "expected to produce a batch.")
+      if isinstance(result, WindowedBatch):
+        if isinstance(result, ConcreteWindowedBatch):
+          # TODO: Rebatch into homogenous batches (or remove
+          # ConcreteWindowedBatch)
+          raise NotImplementedError
+        elif isinstance(result, HomogeneousWindowedBatch):
+          windowed_batch = result
+        else:
+          raise AssertionError(
+              "Unrecognized WindowedBatch implementation: "
+              f"{type(windowed_batch)}")
+
+        if (windowed_input_batch is not None and
+            len(windowed_input_batch.windows) != 1):
+          windowed_batch.windows *= len(windowed_input_batch.windows)
+      # TODO(BEAM-14292): Add TimestampedBatch, an analogue for TimestampedValue

Review Comment:
   Ack, that could be a better solution. We could make it work nicely with schemas + process_batch



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] TheNeuralBit commented on a diff in pull request #17384: [BEAM-14294] Worker changes to support trivial Batched DoFns

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on code in PR #17384:
URL: https://github.com/apache/beam/pull/17384#discussion_r863260992


##########
sdks/python/apache_beam/utils/windowed_value.py:
##########
@@ -279,6 +293,208 @@ def create(value, timestamp_micros, windows, pane_info=PANE_INFO_UNKNOWN):
   return wv
 
 
+class BatchingMode(Enum):
+  CONCRETE = 1
+  HOMOGENEOUS = 2
+
+
+class WindowedBatch(object):
+  """A batch of N windowed values, each having a value, a timestamp and set of
+  windows."""
+  def with_values(self, new_values):
+    # type: (Any) -> WindowedBatch
+
+    """Creates a new WindowedBatch with the same timestamps and windows as this.
+
+    This is the fasted way to create a new WindowedValue.
+    """
+    raise NotImplementedError
+
+  def as_windowed_values(self, explode_fn: Callable) -> Iterable[WindowedValue]:
+    raise NotImplementedError
+
+  @staticmethod
+  def from_windowed_values(
+      windowed_values: Sequence[WindowedValue],
+      *,
+      produce_fn: Callable,
+      mode: BatchingMode = BatchingMode.CONCRETE) -> Iterable['WindowedBatch']:
+    if mode == BatchingMode.HOMOGENEOUS:
+      import collections
+      grouped = collections.defaultdict(lambda: [])
+      for wv in windowed_values:
+        grouped[(wv.timestamp, tuple(wv.windows),
+                 wv.pane_info)].append(wv.value)
+
+      for key, values in grouped.items():
+        timestamp, windows, pane_info = key
+        yield HomogeneousWindowedBatch.of(
+            produce_fn(values), timestamp, windows, pane_info)
+    elif mode == BatchingMode.CONCRETE:
+      yield ConcreteWindowedBatch(
+          produce_fn([wv.value for wv in windowed_values]),
+          [wv.timestamp
+           for wv in windowed_values], [wv.windows for wv in windowed_values],
+          [wv.pane_info for wv in windowed_values])
+    else:
+      raise AssertionError(
+          "Unrecognized BatchingMode in "
+          f"WindowedBatch.from_windowed_values: {mode!r}")
+
+
+class HomogeneousWindowedBatch(WindowedBatch):
+  """A WindowedBatch with Homogeneous event-time information, represented
+  internally as a WindowedValue.
+  """
+  def __init__(self, wv):
+    self._wv = wv
+
+  @staticmethod
+  def of(values, timestamp, windows, pane_info=PANE_INFO_UNKNOWN):
+    return HomogeneousWindowedBatch(
+        WindowedValue(values, timestamp, windows, pane_info))
+
+  @property
+  def values(self):
+    return self._wv.value
+
+  @property
+  def timestamp(self):

Review Comment:
   These are used in `PerWindowInvoker._invoke_process_batch_per_window` to populate DoFn params



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] TheNeuralBit commented on pull request #17384: [BEAM-14294] Worker changes to support trivial Batched DoFns

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on PR #17384:
URL: https://github.com/apache/beam/pull/17384#issuecomment-1115511230

   Ok I need to call it a day, first thing tomorrow I'll work on tearing out `ConcreteWindowedBatch` and addressing the other open comments.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] TheNeuralBit commented on a diff in pull request #17384: [BEAM-14294] Worker changes to support trivial Batched DoFns

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on code in PR #17384:
URL: https://github.com/apache/beam/pull/17384#discussion_r863272376


##########
sdks/python/apache_beam/runners/common.py:
##########
@@ -1361,10 +1580,80 @@ def process_outputs(
         self.main_receivers.receive(windowed_value)
       else:
         self.tagged_receivers[tag].receive(windowed_value)
+
+    # TODO(BEAM-3937): Remove if block after output counter released.
+    # Only enable per_element_output_counter when counter cythonized
+    if self.per_element_output_counter is not None:
+      self.per_element_output_counter.add_input(output_element_count)
+
+  def process_batch_outputs(
+      self, windowed_input_batch, results, watermark_estimator=None):
+    # type: (WindowedValue, Iterable[Any], Optional[WatermarkEstimator]) -> None
+
+    """Dispatch the result of process computation to the appropriate receivers.
+
+    A value wrapped in a TaggedOutput object will be unwrapped and
+    then dispatched to the appropriate indexed output.
+    """
+    if results is None:
+      # TODO(BEAM-3937): Remove if block after output counter released.
+      # Only enable per_element_output_counter when counter cythonized.
+      if self.per_element_output_counter is not None:
+        self.per_element_output_counter.add_input(0)
+      return
+
+    # TODO(BEAM-10782): Verify that the results object is a valid iterable type
+    #  if performance_runtime_type_check is active, without harming performance
+
+    output_element_count = 0
+    for result in results:
+      # results here may be a generator, which cannot call len on it.
+      output_element_count += 1
+      tag = None
+      if isinstance(result, TaggedOutput):
+        tag = result.tag
+        if not isinstance(tag, str):
+          raise TypeError('In %s, tag %s is not a string' % (self, tag))
+        result = result.value
+      if isinstance(result, (WindowedValue, TimestampedValue)):
+        raise TypeError(
+            f"Received {type(result).__name__} from DoFn that was "
+            "expected to produce a batch.")
+      if isinstance(result, WindowedBatch):
+        if isinstance(result, ConcreteWindowedBatch):
+          # TODO: Rebatch into homogenous batches (or remove
+          # ConcreteWindowedBatch)
+          raise NotImplementedError
+        elif isinstance(result, HomogeneousWindowedBatch):
+          windowed_batch = result
+        else:
+          raise AssertionError(
+              "Unrecognized WindowedBatch implementation: "
+              f"{type(windowed_batch)}")
+
+        if (windowed_input_batch is not None and
+            len(windowed_input_batch.windows) != 1):
+          windowed_batch.windows *= len(windowed_input_batch.windows)

Review Comment:
   Ah yes, thank you. FWIW this was lifted from: https://github.com/apache/beam/blob/0daef62a7bd993b13064de80588e343ee764e004/sdks/python/apache_beam/runners/common.py#L1354-1355



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] TheNeuralBit commented on a diff in pull request #17384: [BEAM-14294] Worker changes to support trivial Batched DoFns

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on code in PR #17384:
URL: https://github.com/apache/beam/pull/17384#discussion_r863268400


##########
sdks/python/apache_beam/transforms/combiners.py:
##########
@@ -88,7 +90,7 @@ def expand(self, pcoll):
 
 # TODO(laolu): This type signature is overly restrictive. This should be
 # more general.
-@with_input_types(Union[float, int])
+@with_input_types(Union[float, int, np.int64, np.float64])

Review Comment:
   Note I had to make this typehint change so I could use CombinePerKey in one of my tests. Is this alright?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] robertwb commented on pull request #17384: [BEAM-14294] Worker changes to support trivial Batched DoFns

Posted by GitBox <gi...@apache.org>.
robertwb commented on PR #17384:
URL: https://github.com/apache/beam/pull/17384#issuecomment-1117569231

   That kind of error generally means the coders got messed up somewhere. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] TheNeuralBit commented on a diff in pull request #17384: [BEAM-14294] Worker changes to support trivial Batched DoFns

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on code in PR #17384:
URL: https://github.com/apache/beam/pull/17384#discussion_r863290541


##########
sdks/python/apache_beam/runners/worker/operations.py:
##########
@@ -223,6 +235,120 @@ def current_element_progress(self):
     return self.consumer.current_element_progress()
 
 
+class GeneralPurposeConsumerSet(ConsumerSet):
+  """ConsumerSet implementation that handles all combinations of possible edges.
+  """
+  def __init__(self,
+               counter_factory,
+               step_name,  # type: str
+               output_index,
+               coder,
+               producer_type_hints,
+               consumers,  # type: List[Operation]
+               producer_batch_converter):
+    super().__init__(
+        counter_factory,
+        step_name,
+        output_index,
+        consumers,
+        coder,
+        producer_type_hints)
+
+    self.producer_batch_converter = producer_batch_converter
+
+    # Partition consumers into three groups:
+    # - consumers that will be passed elements
+    # - consumers that will be passed batches (where their input batch type
+    #   matches the output of the producer)
+    # - consumers that will be passed converted batches
+    self.element_consumers: List[Operation] = []
+    self.passthrough_batch_consumers: List[Operation] = []
+    other_batch_consumers: DefaultDict[
+        BatchConverter, List[Operation]] = collections.defaultdict(lambda: [])
+
+    for consumer in consumers:
+      if not consumer.get_batching_preference().supports_batches:
+        self.element_consumers.append(consumer)
+      elif (consumer.get_input_batch_converter() ==
+            self.producer_batch_converter):
+        self.passthrough_batch_consumers.append(consumer)
+      else:
+        # Batch consumer with a mismatched batch type
+        if consumer.get_batching_preference().supports_elements:
+          # Pass it elements if we can
+          self.element_consumers.append(consumer)
+        else:
+          # As a last resort, explode and rebatch
+          consumer_batch_converter = consumer.get_input_batch_converter()
+          # This consumer supports batches, it must have a batch converter
+          assert consumer_batch_converter is not None
+          other_batch_consumers[consumer_batch_converter].append(consumer)
+
+    self.other_batch_consumers: Dict[BatchConverter, List[Operation]] = dict(
+        other_batch_consumers)
+
+    self.has_batch_consumers = (
+        self.passthrough_batch_consumers or self.other_batch_consumers)
+    self._batched_elements: List[Any] = []
+
+  def receive(self, windowed_value):
+    # type: (WindowedValue) -> None
+    self.update_counters_start(windowed_value)
+
+    for consumer in self.element_consumers:
+      cython.cast(Operation, consumer).process(windowed_value)
+
+    # TODO: Do this branching when contstructing ConsumerSet
+    if self.has_batch_consumers:
+      self._batched_elements.append(windowed_value)
+
+    self.update_counters_finish()
+
+  def receive_batch(self, windowed_batch):
+    #self.update_counters_start(windowed_value)
+    if self.element_consumers:
+      for wv in windowed_batch.as_windowed_values(
+          self.producer_batch_converter.explode_batch):
+        for consumer in self.element_consumers:
+          cython.cast(Operation, consumer).process(wv)
+
+    for consumer in self.passthrough_batch_consumers:
+      cython.cast(Operation, consumer).process_batch(windowed_batch)
+
+    for (consumer_batch_converter,
+         consumers) in self.other_batch_consumers.items():
+      # Explode and rebatch into the new batch type (ouch!)

Review Comment:
   > No, it will be logged once per instance (which could be a lot, hundreds of times in streaming).
   
   Is there any way to reduce the number of times it's logged?
   
   > Ideally we could just name the two operations involved.
   
   My concern about it not being actionable was just that it might be difficult to translate from the Operation names back to the code that produced them. After looking at it closer it does look like just Operation has a user-interpretable `__str__` implementation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] TheNeuralBit commented on a diff in pull request #17384: [BEAM-14294] Worker changes to support trivial Batched DoFns

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on code in PR #17384:
URL: https://github.com/apache/beam/pull/17384#discussion_r863293075


##########
sdks/python/apache_beam/runners/common.py:
##########
@@ -1361,10 +1580,80 @@ def process_outputs(
         self.main_receivers.receive(windowed_value)
       else:
         self.tagged_receivers[tag].receive(windowed_value)
+
+    # TODO(BEAM-3937): Remove if block after output counter released.
+    # Only enable per_element_output_counter when counter cythonized
+    if self.per_element_output_counter is not None:
+      self.per_element_output_counter.add_input(output_element_count)
+
+  def process_batch_outputs(
+      self, windowed_input_batch, results, watermark_estimator=None):
+    # type: (WindowedValue, Iterable[Any], Optional[WatermarkEstimator]) -> None
+
+    """Dispatch the result of process computation to the appropriate receivers.
+
+    A value wrapped in a TaggedOutput object will be unwrapped and
+    then dispatched to the appropriate indexed output.
+    """
+    if results is None:
+      # TODO(BEAM-3937): Remove if block after output counter released.
+      # Only enable per_element_output_counter when counter cythonized.
+      if self.per_element_output_counter is not None:
+        self.per_element_output_counter.add_input(0)
+      return
+
+    # TODO(BEAM-10782): Verify that the results object is a valid iterable type
+    #  if performance_runtime_type_check is active, without harming performance
+
+    output_element_count = 0
+    for result in results:
+      # results here may be a generator, which cannot call len on it.
+      output_element_count += 1
+      tag = None
+      if isinstance(result, TaggedOutput):
+        tag = result.tag
+        if not isinstance(tag, str):
+          raise TypeError('In %s, tag %s is not a string' % (self, tag))
+        result = result.value
+      if isinstance(result, (WindowedValue, TimestampedValue)):
+        raise TypeError(
+            f"Received {type(result).__name__} from DoFn that was "
+            "expected to produce a batch.")
+      if isinstance(result, WindowedBatch):
+        if isinstance(result, ConcreteWindowedBatch):
+          # TODO: Rebatch into homogenous batches (or remove
+          # ConcreteWindowedBatch)
+          raise NotImplementedError
+        elif isinstance(result, HomogeneousWindowedBatch):
+          windowed_batch = result
+        else:
+          raise AssertionError(
+              "Unrecognized WindowedBatch implementation: "
+              f"{type(windowed_batch)}")
+
+        if (windowed_input_batch is not None and
+            len(windowed_input_batch.windows) != 1):
+          windowed_batch.windows *= len(windowed_input_batch.windows)

Review Comment:
   Should we just construct a new instance here instead?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] TheNeuralBit commented on pull request #17384: [BEAM-14294] Worker changes to support trivial Batched DoFns

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on PR #17384:
URL: https://github.com/apache/beam/pull/17384#issuecomment-1117822892

   Yeah, I filed BEAM-14410 to track. Good point about reproducing without this PR. I'll add the new test in a separate PR and verify that it reproduces the flake there.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] TheNeuralBit commented on a diff in pull request #17384: [BEAM-14294] Worker changes to support trivial Batched DoFns

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on code in PR #17384:
URL: https://github.com/apache/beam/pull/17384#discussion_r864047907


##########
sdks/python/apache_beam/utils/windowed_value.py:
##########
@@ -279,6 +293,208 @@ def create(value, timestamp_micros, windows, pane_info=PANE_INFO_UNKNOWN):
   return wv
 
 
+class BatchingMode(Enum):
+  CONCRETE = 1
+  HOMOGENEOUS = 2
+
+
+class WindowedBatch(object):
+  """A batch of N windowed values, each having a value, a timestamp and set of
+  windows."""
+  def with_values(self, new_values):
+    # type: (Any) -> WindowedBatch
+
+    """Creates a new WindowedBatch with the same timestamps and windows as this.
+
+    This is the fasted way to create a new WindowedValue.
+    """
+    raise NotImplementedError
+
+  def as_windowed_values(self, explode_fn: Callable) -> Iterable[WindowedValue]:
+    raise NotImplementedError
+
+  @staticmethod
+  def from_windowed_values(
+      windowed_values: Sequence[WindowedValue],
+      *,
+      produce_fn: Callable,
+      mode: BatchingMode = BatchingMode.CONCRETE) -> Iterable['WindowedBatch']:
+    if mode == BatchingMode.HOMOGENEOUS:
+      import collections
+      grouped = collections.defaultdict(lambda: [])
+      for wv in windowed_values:
+        grouped[(wv.timestamp, tuple(wv.windows),
+                 wv.pane_info)].append(wv.value)
+
+      for key, values in grouped.items():
+        timestamp, windows, pane_info = key
+        yield HomogeneousWindowedBatch.of(
+            produce_fn(values), timestamp, windows, pane_info)
+    elif mode == BatchingMode.CONCRETE:
+      yield ConcreteWindowedBatch(
+          produce_fn([wv.value for wv in windowed_values]),
+          [wv.timestamp
+           for wv in windowed_values], [wv.windows for wv in windowed_values],
+          [wv.pane_info for wv in windowed_values])
+    else:
+      raise AssertionError(
+          "Unrecognized BatchingMode in "
+          f"WindowedBatch.from_windowed_values: {mode!r}")
+
+
+class HomogeneousWindowedBatch(WindowedBatch):
+  """A WindowedBatch with Homogeneous event-time information, represented
+  internally as a WindowedValue.
+  """
+  def __init__(self, wv):
+    self._wv = wv
+
+  @staticmethod
+  def of(values, timestamp, windows, pane_info=PANE_INFO_UNKNOWN):
+    return HomogeneousWindowedBatch(
+        WindowedValue(values, timestamp, windows, pane_info))
+
+  @property
+  def values(self):
+    return self._wv.value
+
+  @property
+  def timestamp(self):
+    return self._wv.timestamp
+
+  @property
+  def pane_info(self):
+    return self._wv.pane_info
+
+  @property
+  def windows(self):
+    return self._wv.windows
+
+  @windows.setter
+  def windows(self, value):
+    self._wv.windows = value
+
+  def with_values(self, new_values):
+    # type: (Any) -> WindowedBatch
+    return HomogeneousWindowedBatch(self._wv.with_value(new_values))
+
+  def as_windowed_values(self, explode_fn: Callable) -> Iterable[WindowedValue]:
+    for value in explode_fn(self._wv.value):
+      yield self._wv.with_value(value)
+
+  def __eq__(self, other):
+    if isinstance(other, HomogeneousWindowedBatch):
+      return self._wv == other._wv
+    return NotImplemented
+
+  def __hash__(self):
+    return hash(self._wv)
+
+
+class ConcreteWindowedBatch(WindowedBatch):
+  """A concrete WindowedBatch where all event-time information is stored
+  independently for each element.
+
+  Attributes:
+    values: The underlying values of the windowed batch.
+    timestamp: An iterable of timestamps associated with the value as seconds
+      since Unix epoch.
+    windows: An iterable with a set (iterable) of window objects for each value.
+      The window objects are descendants of the BoundedWindow class.
+    pane_info: An iterable of PaneInfo descriptors describing the triggering
+      information for the pane that contained each value. Alternatively, a
+      single PaneInfo may be specified to use for every value. If None, will be
+      set to PANE_INFO_UNKNOWN.
+  """
+  def __init__(
+      self,
+      values,
+      timestamps,  # type: Sequence[TimestampTypes]
+      windows,  # type: Iterable[Tuple[BoundedWindow, ...]]
+      pane_infos=PANE_INFO_UNKNOWN  # type: Union[Iterable[PaneInfo],PaneInfo]
+  ):
+    self.values = values
+
+    def convert_timestamp(timestamp: TimestampTypes) -> int:
+      if isinstance(timestamp, int):
+        return timestamp * 1000000
+      else:
+        # TODO: Cache Timestamp object as in WindowedValue?
+        timestamp_object = (
+            timestamp
+            if isinstance(timestamp, Timestamp) else Timestamp.of(timestamp))
+        return timestamp_object.micros
+
+    self.timestamp_objects: Optional[List[Timestamp]] = None
+    self.timestamps_micros = [convert_timestamp(t) for t in timestamps]
+    self.windows = windows
+    #TODO: Should we store length?
+    #self.length = length
+    self.pane_infos = pane_infos
+
+  @property
+  def timestamps(self) -> Sequence[Timestamp]:

Review Comment:
   Removed along with all of ConcreteWindowedBatch



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] TheNeuralBit commented on a diff in pull request #17384: [BEAM-14294] Worker changes to support trivial Batched DoFns

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on code in PR #17384:
URL: https://github.com/apache/beam/pull/17384#discussion_r864372731


##########
sdks/python/apache_beam/runners/worker/operations.py:
##########
@@ -223,6 +235,120 @@ def current_element_progress(self):
     return self.consumer.current_element_progress()
 
 
+class GeneralPurposeConsumerSet(ConsumerSet):
+  """ConsumerSet implementation that handles all combinations of possible edges.
+  """
+  def __init__(self,
+               counter_factory,
+               step_name,  # type: str
+               output_index,
+               coder,
+               producer_type_hints,
+               consumers,  # type: List[Operation]
+               producer_batch_converter):
+    super().__init__(
+        counter_factory,
+        step_name,
+        output_index,
+        consumers,
+        coder,
+        producer_type_hints)
+
+    self.producer_batch_converter = producer_batch_converter
+
+    # Partition consumers into three groups:
+    # - consumers that will be passed elements
+    # - consumers that will be passed batches (where their input batch type
+    #   matches the output of the producer)
+    # - consumers that will be passed converted batches
+    self.element_consumers: List[Operation] = []
+    self.passthrough_batch_consumers: List[Operation] = []
+    other_batch_consumers: DefaultDict[
+        BatchConverter, List[Operation]] = collections.defaultdict(lambda: [])
+
+    for consumer in consumers:
+      if not consumer.get_batching_preference().supports_batches:
+        self.element_consumers.append(consumer)
+      elif (consumer.get_input_batch_converter() ==
+            self.producer_batch_converter):
+        self.passthrough_batch_consumers.append(consumer)
+      else:
+        # Batch consumer with a mismatched batch type
+        if consumer.get_batching_preference().supports_elements:
+          # Pass it elements if we can
+          self.element_consumers.append(consumer)
+        else:
+          # As a last resort, explode and rebatch
+          consumer_batch_converter = consumer.get_input_batch_converter()
+          # This consumer supports batches, it must have a batch converter
+          assert consumer_batch_converter is not None
+          other_batch_consumers[consumer_batch_converter].append(consumer)
+
+    self.other_batch_consumers: Dict[BatchConverter, List[Operation]] = dict(
+        other_batch_consumers)
+
+    self.has_batch_consumers = (
+        self.passthrough_batch_consumers or self.other_batch_consumers)
+    self._batched_elements: List[Any] = []
+
+  def receive(self, windowed_value):
+    # type: (WindowedValue) -> None
+    self.update_counters_start(windowed_value)
+
+    for consumer in self.element_consumers:
+      cython.cast(Operation, consumer).process(windowed_value)
+
+    # TODO: Do this branching when contstructing ConsumerSet
+    if self.has_batch_consumers:
+      self._batched_elements.append(windowed_value)
+
+    self.update_counters_finish()
+
+  def receive_batch(self, windowed_batch):
+    #self.update_counters_start(windowed_value)
+    if self.element_consumers:
+      for wv in windowed_batch.as_windowed_values(
+          self.producer_batch_converter.explode_batch):
+        for consumer in self.element_consumers:
+          cython.cast(Operation, consumer).process(wv)
+
+    for consumer in self.passthrough_batch_consumers:
+      cython.cast(Operation, consumer).process_batch(windowed_batch)
+
+    for (consumer_batch_converter,
+         consumers) in self.other_batch_consumers.items():
+      # Explode and rebatch into the new batch type (ouch!)

Review Comment:
   Ok I added an `InefficientExecutionWarning` for this. For now it just logs the _consuming_ operation and the batch type hints involved, I think that should be enough for a user to pinpoint the issue.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] TheNeuralBit commented on pull request #17384: [BEAM-14294] Worker changes to support trivial Batched DoFns

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on PR #17384:
URL: https://github.com/apache/beam/pull/17384#issuecomment-1117641053

   See [2067b13](https://github.com/apache/beam/pull/17384/commits/2067b132dc63106bd08dc22a6aaaa1f72876b06c) for attempted resolution of the flake (we'll see how CI checks go). I separated out the flush-triggering test, and added a new element-wise DoFn test that also triggers the flake. Both of these are skipped when running with non-cythonized coder_impl.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] TheNeuralBit commented on pull request #17384: [BEAM-14294] Worker changes to support trivial Batched DoFns

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on PR #17384:
URL: https://github.com/apache/beam/pull/17384#issuecomment-1112774127

   @robertwb I think this is largely ready. I added a new `WindowedBatch` implementation, `HomogeneousWindowedBatch` which is effectively a wrapper around `WindowedValue`, where we know the value is a batch. We now enforce that batches are partitioned by `(timestamp, windows, pane_info)` and only create `HomoegeneousWindowedBatch` instances.
   
   The old implementation, `ConcreteWindowedBatch` isn't used, it's probably worth just backing that out and saving it for a future PR.
   
   Other notes: I've added support for {Timestamp, PaneInfo, Window}Param on process_batch (and we validate only these are used at construction time). Other DoFn params that were discussed in https://s.apache.org/batched-dofns are not yet supported.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] TheNeuralBit commented on pull request #17384: [WIP][BEAM-14294] Worker changes to support trivial Batched DoFns

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on PR #17384:
URL: https://github.com/apache/beam/pull/17384#issuecomment-1103004515

   Run Python PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] TheNeuralBit commented on a diff in pull request #17384: [WIP][BEAM-14294] Worker changes to support trivial Batched DoFns

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on code in PR #17384:
URL: https://github.com/apache/beam/pull/17384#discussion_r853599231


##########
sdks/python/apache_beam/runners/common.py:
##########
@@ -1170,6 +1241,9 @@ def __init__(self,
     else:
       per_element_output_counter = None
 
+    # TODO: output processor assumes DoFns are batch-to-batch or
+    # element-to-element, @yields_batches and @yields_elements will break this
+    # assumption.

Review Comment:
   Tag the appropriate jira here



##########
sdks/python/apache_beam/runners/worker/operations.py:
##########
@@ -329,7 +432,9 @@ def finish(self):
     # type: () -> None
 
     """Finish operation."""
-    pass
+    # TODO: Do we need an output_index here

Review Comment:
   This is still an open question



##########
sdks/python/apache_beam/runners/common.py:
##########
@@ -1361,10 +1450,78 @@ def process_outputs(
         self.main_receivers.receive(windowed_value)
       else:
         self.tagged_receivers[tag].receive(windowed_value)
+
+    # TODO(BEAM-3937): Remove if block after output counter released.
+    # Only enable per_element_output_counter when counter cythonized
+    if self.per_element_output_counter is not None:
+      self.per_element_output_counter.add_input(output_element_count)
+
+  def process_batch_outputs(
+      self, windowed_input_batch, results, watermark_estimator=None):
+    # type: (WindowedValue, Iterable[Any], Optional[WatermarkEstimator]) -> None
+
+    """Dispatch the result of process computation to the appropriate receivers.
+
+    A value wrapped in a TaggedOutput object will be unwrapped and
+    then dispatched to the appropriate indexed output.
+    """
+    if results is None:
+      # TODO(BEAM-3937): Remove if block after output counter released.
+      # Only enable per_element_output_counter when counter cythonized.
+      if self.per_element_output_counter is not None:
+        self.per_element_output_counter.add_input(0)
+      return
+
+    # TODO(BEAM-10782): Verify that the results object is a valid iterable type
+    #  if performance_runtime_type_check is active, without harming performance
+
+    output_element_count = 0
+    for result in results:
+      # results here may be a generator, which cannot call len on it.
+      output_element_count += 1
+      tag = None
+      if isinstance(result, TaggedOutput):
+        tag = result.tag
+        if not isinstance(tag, str):
+          raise TypeError('In %s, tag %s is not a string' % (self, tag))
+        result = result.value
+      if isinstance(result, (WindowedValue, TimestampedValue)):
+        # TODO: Helpful message
+        raise RuntimeError
+      if isinstance(result, WindowedBatch):
+        windowed_batch = result
+        # TODO: Should we do this in batches?
+        #       Would need to require one batch per window
+        #if (windowed_input_element is not None and
+        #    len(windowed_input_element.windows) != 1):
+        #  windowed_value.windows *= len(windowed_input_element.windows)
+      # TODO: TimestampedBatch
+      #elif isinstance(result, TimestampedValue):
+      #  assign_context = WindowFn.AssignContext(result.timestamp, result.value)
+      #  windowed_value = WindowedValue(
+      #      result.value,
+      #      result.timestamp,
+      #      self.window_fn.assign(assign_context))
+      #  if len(windowed_input_element.windows) != 1:
+      #    windowed_value.windows *= len(windowed_input_element.windows)
+      else:
+        # TODO: This should error unless the DoFn was defined with
+        # @DoFn.yields_batches(output_aligned_with_input=True)
+        # We should consider also validating that the length is the same as
+        # windowed_input_batch
+        windowed_batch = windowed_input_batch.with_values(result)
+
+      if watermark_estimator is not None:
+        for timestamp in windowed_batch.timestamps:
+          watermark_estimator.observe_timestamp(timestamp)
+      if tag is None:
+        self.main_receivers.receive_batch(windowed_batch)
+      else:
+        self.tagged_receivers[tag].receive_batch(windowed_batch)

Review Comment:
   OutputProcessor logic needs to be cleaned up



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] TheNeuralBit commented on a diff in pull request #17384: [BEAM-14294] Worker changes to support trivial Batched DoFns

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on code in PR #17384:
URL: https://github.com/apache/beam/pull/17384#discussion_r863241349


##########
sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:
##########
@@ -121,6 +126,148 @@ def test_pardo(self):
           | beam.Map(lambda e: e + 'x'))
       assert_that(res, equal_to(['aax', 'bcbcx']))
 
+  def test_batch_pardo(self):
+    with self.create_pipeline() as p:
+      res = (
+          p
+          | beam.Create(np.array([1, 2, 3], dtype=np.int64)).with_output_types(
+              np.int64)
+          | beam.ParDo(ArrayMultiplyDoFn())
+          | beam.Map(lambda x: x * 3))
+
+      assert_that(res, equal_to([6, 12, 18]))
+
+  def test_batch_rebatch_pardos(self):
+    with self.create_pipeline() as p:
+      res = (
+          p
+          | beam.Create(np.array([1, 2, 3], dtype=np.int64)).with_output_types(
+              np.int64)
+          | beam.ParDo(ArrayMultiplyDoFn())
+          | beam.ParDo(ListPlusOneDoFn())
+          | beam.Map(lambda x: x * 3))
+
+      assert_that(res, equal_to([9, 15, 21]))
+
+  def test_batch_pardo_fusion_break(self):
+    class NormalizeDoFn(beam.DoFn):
+      @no_type_check
+      def process_batch(
+          self,
+          batch: np.ndarray,
+          mean: np.float64,
+      ) -> Iterator[np.ndarray]:
+        assert isinstance(batch, np.ndarray)
+        yield batch - mean
+
+      # infer_output_type must be defined (when there's no process method),

Review Comment:
   It does fall back to Any, but in this case I want the element type to be specific since it also represents the element type of the ndarray 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] TheNeuralBit commented on a diff in pull request #17384: [BEAM-14294] Worker changes to support trivial Batched DoFns

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on code in PR #17384:
URL: https://github.com/apache/beam/pull/17384#discussion_r864393722


##########
sdks/python/apache_beam/runners/common.py:
##########
@@ -1361,10 +1580,80 @@ def process_outputs(
         self.main_receivers.receive(windowed_value)
       else:
         self.tagged_receivers[tag].receive(windowed_value)
+
+    # TODO(BEAM-3937): Remove if block after output counter released.
+    # Only enable per_element_output_counter when counter cythonized
+    if self.per_element_output_counter is not None:
+      self.per_element_output_counter.add_input(output_element_count)
+
+  def process_batch_outputs(
+      self, windowed_input_batch, results, watermark_estimator=None):
+    # type: (WindowedValue, Iterable[Any], Optional[WatermarkEstimator]) -> None
+
+    """Dispatch the result of process computation to the appropriate receivers.
+
+    A value wrapped in a TaggedOutput object will be unwrapped and
+    then dispatched to the appropriate indexed output.
+    """
+    if results is None:
+      # TODO(BEAM-3937): Remove if block after output counter released.
+      # Only enable per_element_output_counter when counter cythonized.
+      if self.per_element_output_counter is not None:
+        self.per_element_output_counter.add_input(0)
+      return
+
+    # TODO(BEAM-10782): Verify that the results object is a valid iterable type
+    #  if performance_runtime_type_check is active, without harming performance
+
+    output_element_count = 0
+    for result in results:
+      # results here may be a generator, which cannot call len on it.
+      output_element_count += 1
+      tag = None
+      if isinstance(result, TaggedOutput):
+        tag = result.tag
+        if not isinstance(tag, str):
+          raise TypeError('In %s, tag %s is not a string' % (self, tag))
+        result = result.value
+      if isinstance(result, (WindowedValue, TimestampedValue)):
+        raise TypeError(
+            f"Received {type(result).__name__} from DoFn that was "
+            "expected to produce a batch.")
+      if isinstance(result, WindowedBatch):
+        if isinstance(result, ConcreteWindowedBatch):
+          # TODO: Rebatch into homogenous batches (or remove
+          # ConcreteWindowedBatch)
+          raise NotImplementedError
+        elif isinstance(result, HomogeneousWindowedBatch):
+          windowed_batch = result
+        else:
+          raise AssertionError(
+              "Unrecognized WindowedBatch implementation: "
+              f"{type(windowed_batch)}")
+
+        if (windowed_input_batch is not None and
+            len(windowed_input_batch.windows) != 1):
+          windowed_batch.windows *= len(windowed_input_batch.windows)

Review Comment:
   In the interest of getting this in for the release cut I may punt on this and send a separate PR to fix it in both places at once.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] TheNeuralBit commented on a diff in pull request #17384: [BEAM-14294] Worker changes to support trivial Batched DoFns

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on code in PR #17384:
URL: https://github.com/apache/beam/pull/17384#discussion_r864023084


##########
sdks/python/apache_beam/utils/windowed_value.py:
##########
@@ -279,6 +293,208 @@ def create(value, timestamp_micros, windows, pane_info=PANE_INFO_UNKNOWN):
   return wv
 
 
+class BatchingMode(Enum):
+  CONCRETE = 1
+  HOMOGENEOUS = 2
+
+
+class WindowedBatch(object):
+  """A batch of N windowed values, each having a value, a timestamp and set of
+  windows."""
+  def with_values(self, new_values):
+    # type: (Any) -> WindowedBatch
+
+    """Creates a new WindowedBatch with the same timestamps and windows as this.
+
+    This is the fasted way to create a new WindowedValue.
+    """
+    raise NotImplementedError
+
+  def as_windowed_values(self, explode_fn: Callable) -> Iterable[WindowedValue]:
+    raise NotImplementedError
+
+  @staticmethod
+  def from_windowed_values(
+      windowed_values: Sequence[WindowedValue],
+      *,
+      produce_fn: Callable,
+      mode: BatchingMode = BatchingMode.CONCRETE) -> Iterable['WindowedBatch']:
+    if mode == BatchingMode.HOMOGENEOUS:

Review Comment:
   Done, [0f2e5b9](https://github.com/apache/beam/pull/17384/commits/0f2e5b9243e70eb6cf4f7c60f11c636371cad849)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] TheNeuralBit commented on pull request #17384: [BEAM-14294] Worker changes to support trivial Batched DoFns

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on PR #17384:
URL: https://github.com/apache/beam/pull/17384#issuecomment-1116837666

   Ok, I think all of the major points are addressed @robertwb. I will come back for another pass tomorrow morning to clean up lint checks, populate the BEAM-XXX jiras, address any other nits, etc..


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] TheNeuralBit commented on a diff in pull request #17384: [WIP][BEAM-14294] Worker changes to support trivial Batched DoFns

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on code in PR #17384:
URL: https://github.com/apache/beam/pull/17384#discussion_r855538744


##########
sdks/python/apache_beam/utils/windowed_value.pxd:
##########
@@ -43,6 +43,14 @@ cdef class WindowedValue(object):
 
   cpdef WindowedValue with_value(self, new_value)
 
+@cython.final
+cdef class WindowedBatch(object):

Review Comment:
   WindowedBatch should be an interface



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] TheNeuralBit commented on a diff in pull request #17384: [BEAM-14294] Worker changes to support trivial Batched DoFns

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on code in PR #17384:
URL: https://github.com/apache/beam/pull/17384#discussion_r864394015


##########
sdks/python/apache_beam/runners/worker/operations.py:
##########
@@ -223,6 +235,120 @@ def current_element_progress(self):
     return self.consumer.current_element_progress()
 
 
+class GeneralPurposeConsumerSet(ConsumerSet):
+  """ConsumerSet implementation that handles all combinations of possible edges.
+  """
+  def __init__(self,
+               counter_factory,
+               step_name,  # type: str
+               output_index,
+               coder,
+               producer_type_hints,
+               consumers,  # type: List[Operation]
+               producer_batch_converter):
+    super().__init__(
+        counter_factory,
+        step_name,
+        output_index,
+        consumers,
+        coder,
+        producer_type_hints)
+
+    self.producer_batch_converter = producer_batch_converter
+
+    # Partition consumers into three groups:
+    # - consumers that will be passed elements
+    # - consumers that will be passed batches (where their input batch type
+    #   matches the output of the producer)
+    # - consumers that will be passed converted batches
+    self.element_consumers: List[Operation] = []
+    self.passthrough_batch_consumers: List[Operation] = []
+    other_batch_consumers: DefaultDict[
+        BatchConverter, List[Operation]] = collections.defaultdict(lambda: [])
+
+    for consumer in consumers:
+      if not consumer.get_batching_preference().supports_batches:
+        self.element_consumers.append(consumer)
+      elif (consumer.get_input_batch_converter() ==
+            self.producer_batch_converter):
+        self.passthrough_batch_consumers.append(consumer)
+      else:
+        # Batch consumer with a mismatched batch type
+        if consumer.get_batching_preference().supports_elements:
+          # Pass it elements if we can
+          self.element_consumers.append(consumer)
+        else:
+          # As a last resort, explode and rebatch
+          consumer_batch_converter = consumer.get_input_batch_converter()
+          # This consumer supports batches, it must have a batch converter
+          assert consumer_batch_converter is not None
+          other_batch_consumers[consumer_batch_converter].append(consumer)
+
+    self.other_batch_consumers: Dict[BatchConverter, List[Operation]] = dict(
+        other_batch_consumers)
+
+    self.has_batch_consumers = (
+        self.passthrough_batch_consumers or self.other_batch_consumers)
+    self._batched_elements: List[Any] = []
+
+  def receive(self, windowed_value):
+    # type: (WindowedValue) -> None
+    self.update_counters_start(windowed_value)
+
+    for consumer in self.element_consumers:
+      cython.cast(Operation, consumer).process(windowed_value)
+
+    # TODO: Do this branching when contstructing ConsumerSet
+    if self.has_batch_consumers:
+      self._batched_elements.append(windowed_value)
+
+    self.update_counters_finish()

Review Comment:
   Discussed this some offline. For now, this implementation will be as good as what I'm doing in receive_batch (element counts will be correct, byte size estimates can be off). Improving it will be tracked as part of the same TODO



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] TheNeuralBit commented on pull request #17384: [BEAM-14294] Worker changes to support trivial Batched DoFns

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on PR #17384:
URL: https://github.com/apache/beam/pull/17384#issuecomment-1117418018

   IT seems somehow I've introduced some flakiness in `test_batch_pardo`. I'm not sure how that happened.
   
   ```
   apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:141: 
   _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
   apache_beam/pipeline.py:596: in __exit__
       self.result = self.run()
   apache_beam/pipeline.py:573: in run
       return self.runner.run_pipeline(self, self._options)
   apache_beam/runners/portability/fn_api_runner/fn_runner.py:200: in run_pipeline
       pipeline.to_runner_api(default_environment=self._default_environment))
   apache_beam/runners/portability/fn_api_runner/fn_runner.py:208: in run_via_runner_api
       return self.run_stages(stage_context, stages)
   apache_beam/runners/portability/fn_api_runner/fn_runner.py:409: in run_stages
       runner_execution_context, bundle_context_manager, bundle_input)
   apache_beam/runners/portability/fn_api_runner/fn_runner.py:742: in _execute_bundle
       bundle_manager))
   apache_beam/runners/portability/fn_api_runner/fn_runner.py:966: in _run_bundle
       data_input, data_output, input_timers, expected_timer_output)
   apache_beam/runners/portability/fn_api_runner/fn_runner.py:1309: in process_bundle
       output.transform_id).append(output.data)
   apache_beam/runners/portability/fn_api_runner/execution.py:242: in append
       windowed_key_value = coder_impl.decode_from_stream(input_stream, True)
   apache_beam/coders/coder_impl.py:1446: in decode_from_stream
       value = self._value_coder.decode_from_stream(in_stream, nested)
   apache_beam/coders/coder_impl.py:994: in decode_from_stream
       c in enumerate(self._coder_impls)
   apache_beam/coders/coder_impl.py:994: in <listcomp>
       c in enumerate(self._coder_impls)
   apache_beam/coders/coder_impl.py:617: in decode_from_stream
       return in_stream.read_all(nested)
   apache_beam/coders/slow_stream.py:137: in read_all
       return self.read(self.read_var_int64() if nested else self.size())
   apache_beam/coders/slow_stream.py:148: in read_var_int64
       byte = self.read_byte()
   _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
   
   self = <apache_beam.coders.slow_stream.InputStream object at 0x7f8433d63190>
   
       def read_byte(self):
         # type: () -> int
         self.pos += 1
   >     return self.data[self.pos - 1]
   E     IndexError: index out of range
   
   apache_beam/coders/slow_stream.py:142: IndexError
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] robertwb commented on pull request #17384: [BEAM-14294] Worker changes to support trivial Batched DoFns

Posted by GitBox <gi...@apache.org>.
robertwb commented on PR #17384:
URL: https://github.com/apache/beam/pull/17384#issuecomment-1117818972

   (Assuming we can also reproduce this flake without this PR.)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] TheNeuralBit commented on a diff in pull request #17384: [BEAM-14294] Worker changes to support trivial Batched DoFns

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on code in PR #17384:
URL: https://github.com/apache/beam/pull/17384#discussion_r864288357


##########
sdks/python/apache_beam/runners/worker/operations.py:
##########
@@ -223,6 +235,120 @@ def current_element_progress(self):
     return self.consumer.current_element_progress()
 
 
+class GeneralPurposeConsumerSet(ConsumerSet):
+  """ConsumerSet implementation that handles all combinations of possible edges.
+  """
+  def __init__(self,
+               counter_factory,
+               step_name,  # type: str
+               output_index,
+               coder,
+               producer_type_hints,
+               consumers,  # type: List[Operation]
+               producer_batch_converter):
+    super().__init__(
+        counter_factory,
+        step_name,
+        output_index,
+        consumers,
+        coder,
+        producer_type_hints)
+
+    self.producer_batch_converter = producer_batch_converter
+
+    # Partition consumers into three groups:
+    # - consumers that will be passed elements
+    # - consumers that will be passed batches (where their input batch type
+    #   matches the output of the producer)
+    # - consumers that will be passed converted batches
+    self.element_consumers: List[Operation] = []
+    self.passthrough_batch_consumers: List[Operation] = []
+    other_batch_consumers: DefaultDict[
+        BatchConverter, List[Operation]] = collections.defaultdict(lambda: [])
+
+    for consumer in consumers:
+      if not consumer.get_batching_preference().supports_batches:
+        self.element_consumers.append(consumer)
+      elif (consumer.get_input_batch_converter() ==
+            self.producer_batch_converter):
+        self.passthrough_batch_consumers.append(consumer)
+      else:
+        # Batch consumer with a mismatched batch type
+        if consumer.get_batching_preference().supports_elements:
+          # Pass it elements if we can
+          self.element_consumers.append(consumer)
+        else:
+          # As a last resort, explode and rebatch
+          consumer_batch_converter = consumer.get_input_batch_converter()
+          # This consumer supports batches, it must have a batch converter
+          assert consumer_batch_converter is not None
+          other_batch_consumers[consumer_batch_converter].append(consumer)
+
+    self.other_batch_consumers: Dict[BatchConverter, List[Operation]] = dict(
+        other_batch_consumers)
+
+    self.has_batch_consumers = (
+        self.passthrough_batch_consumers or self.other_batch_consumers)
+    self._batched_elements: List[Any] = []
+
+  def receive(self, windowed_value):
+    # type: (WindowedValue) -> None
+    self.update_counters_start(windowed_value)
+
+    for consumer in self.element_consumers:
+      cython.cast(Operation, consumer).process(windowed_value)
+
+    # TODO: Do this branching when contstructing ConsumerSet
+    if self.has_batch_consumers:
+      self._batched_elements.append(windowed_value)
+
+    self.update_counters_finish()
+
+  def receive_batch(self, windowed_batch):
+    #self.update_counters_start(windowed_value)

Review Comment:
   Done. I added an update_counters_batch that just counts elements and call it at the end of receive_batch. Does that seem reasonable?
   
   Also dropped a TODO for tracking byte size.
   
   Remaining work here:
   - [ ] Right now this code is exercised in the `test_batch_*` tests, but that doesn't actually verify it. I should at least add a test to exercise it in opcounters_test
   - [x] File a jira and update the TODO-XXX



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] TheNeuralBit commented on a diff in pull request #17384: [BEAM-14294] Worker changes to support trivial Batched DoFns

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on code in PR #17384:
URL: https://github.com/apache/beam/pull/17384#discussion_r863246975


##########
sdks/python/apache_beam/utils/windowed_value.py:
##########
@@ -279,6 +293,208 @@ def create(value, timestamp_micros, windows, pane_info=PANE_INFO_UNKNOWN):
   return wv
 
 
+class BatchingMode(Enum):
+  CONCRETE = 1
+  HOMOGENEOUS = 2
+
+
+class WindowedBatch(object):
+  """A batch of N windowed values, each having a value, a timestamp and set of
+  windows."""
+  def with_values(self, new_values):
+    # type: (Any) -> WindowedBatch
+
+    """Creates a new WindowedBatch with the same timestamps and windows as this.
+
+    This is the fasted way to create a new WindowedValue.
+    """
+    raise NotImplementedError
+
+  def as_windowed_values(self, explode_fn: Callable) -> Iterable[WindowedValue]:
+    raise NotImplementedError
+
+  @staticmethod
+  def from_windowed_values(
+      windowed_values: Sequence[WindowedValue],
+      *,
+      produce_fn: Callable,
+      mode: BatchingMode = BatchingMode.CONCRETE) -> Iterable['WindowedBatch']:
+    if mode == BatchingMode.HOMOGENEOUS:
+      import collections
+      grouped = collections.defaultdict(lambda: [])
+      for wv in windowed_values:
+        grouped[(wv.timestamp, tuple(wv.windows),
+                 wv.pane_info)].append(wv.value)
+
+      for key, values in grouped.items():
+        timestamp, windows, pane_info = key
+        yield HomogeneousWindowedBatch.of(
+            produce_fn(values), timestamp, windows, pane_info)
+    elif mode == BatchingMode.CONCRETE:
+      yield ConcreteWindowedBatch(
+          produce_fn([wv.value for wv in windowed_values]),
+          [wv.timestamp
+           for wv in windowed_values], [wv.windows for wv in windowed_values],
+          [wv.pane_info for wv in windowed_values])
+    else:
+      raise AssertionError(
+          "Unrecognized BatchingMode in "
+          f"WindowedBatch.from_windowed_values: {mode!r}")
+
+
+class HomogeneousWindowedBatch(WindowedBatch):
+  """A WindowedBatch with Homogeneous event-time information, represented
+  internally as a WindowedValue.
+  """
+  def __init__(self, wv):
+    self._wv = wv
+
+  @staticmethod
+  def of(values, timestamp, windows, pane_info=PANE_INFO_UNKNOWN):
+    return HomogeneousWindowedBatch(
+        WindowedValue(values, timestamp, windows, pane_info))
+
+  @property
+  def values(self):
+    return self._wv.value
+
+  @property
+  def timestamp(self):
+    return self._wv.timestamp
+
+  @property
+  def pane_info(self):
+    return self._wv.pane_info
+
+  @property
+  def windows(self):
+    return self._wv.windows
+
+  @windows.setter
+  def windows(self, value):
+    self._wv.windows = value
+
+  def with_values(self, new_values):
+    # type: (Any) -> WindowedBatch
+    return HomogeneousWindowedBatch(self._wv.with_value(new_values))
+
+  def as_windowed_values(self, explode_fn: Callable) -> Iterable[WindowedValue]:
+    for value in explode_fn(self._wv.value):
+      yield self._wv.with_value(value)
+
+  def __eq__(self, other):
+    if isinstance(other, HomogeneousWindowedBatch):
+      return self._wv == other._wv
+    return NotImplemented
+
+  def __hash__(self):
+    return hash(self._wv)
+
+
+class ConcreteWindowedBatch(WindowedBatch):
+  """A concrete WindowedBatch where all event-time information is stored
+  independently for each element.
+
+  Attributes:
+    values: The underlying values of the windowed batch.
+    timestamp: An iterable of timestamps associated with the value as seconds
+      since Unix epoch.
+    windows: An iterable with a set (iterable) of window objects for each value.
+      The window objects are descendants of the BoundedWindow class.
+    pane_info: An iterable of PaneInfo descriptors describing the triggering
+      information for the pane that contained each value. Alternatively, a
+      single PaneInfo may be specified to use for every value. If None, will be
+      set to PANE_INFO_UNKNOWN.
+  """
+  def __init__(
+      self,
+      values,
+      timestamps,  # type: Sequence[TimestampTypes]
+      windows,  # type: Iterable[Tuple[BoundedWindow, ...]]
+      pane_infos=PANE_INFO_UNKNOWN  # type: Union[Iterable[PaneInfo],PaneInfo]
+  ):
+    self.values = values
+
+    def convert_timestamp(timestamp: TimestampTypes) -> int:
+      if isinstance(timestamp, int):
+        return timestamp * 1000000
+      else:
+        # TODO: Cache Timestamp object as in WindowedValue?
+        timestamp_object = (
+            timestamp
+            if isinstance(timestamp, Timestamp) else Timestamp.of(timestamp))
+        return timestamp_object.micros
+
+    self.timestamp_objects: Optional[List[Timestamp]] = None
+    self.timestamps_micros = [convert_timestamp(t) for t in timestamps]
+    self.windows = windows
+    #TODO: Should we store length?
+    #self.length = length
+    self.pane_infos = pane_infos
+
+  @property
+  def timestamps(self) -> Sequence[Timestamp]:
+    if self.timestamp_objects is None:
+      self.timestamp_objects = [
+          Timestamp(0, micros) for micros in self.timestamps_micros
+      ]
+
+    return self.timestamp_objects
+
+  def with_values(self, new_values):

Review Comment:
   No this implementation is never used, since everything is built on HomogeneousWindowedBatch for now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] TheNeuralBit commented on a diff in pull request #17384: [BEAM-14294] Worker changes to support trivial Batched DoFns

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on code in PR #17384:
URL: https://github.com/apache/beam/pull/17384#discussion_r864050411


##########
sdks/python/apache_beam/runners/worker/operations.py:
##########
@@ -223,6 +235,120 @@ def current_element_progress(self):
     return self.consumer.current_element_progress()
 
 
+class GeneralPurposeConsumerSet(ConsumerSet):
+  """ConsumerSet implementation that handles all combinations of possible edges.
+  """
+  def __init__(self,
+               counter_factory,
+               step_name,  # type: str
+               output_index,
+               coder,
+               producer_type_hints,
+               consumers,  # type: List[Operation]
+               producer_batch_converter):
+    super().__init__(
+        counter_factory,
+        step_name,
+        output_index,
+        consumers,
+        coder,
+        producer_type_hints)
+
+    self.producer_batch_converter = producer_batch_converter
+
+    # Partition consumers into three groups:
+    # - consumers that will be passed elements
+    # - consumers that will be passed batches (where their input batch type
+    #   matches the output of the producer)
+    # - consumers that will be passed converted batches
+    self.element_consumers: List[Operation] = []
+    self.passthrough_batch_consumers: List[Operation] = []
+    other_batch_consumers: DefaultDict[
+        BatchConverter, List[Operation]] = collections.defaultdict(lambda: [])
+
+    for consumer in consumers:
+      if not consumer.get_batching_preference().supports_batches:
+        self.element_consumers.append(consumer)
+      elif (consumer.get_input_batch_converter() ==
+            self.producer_batch_converter):
+        self.passthrough_batch_consumers.append(consumer)
+      else:
+        # Batch consumer with a mismatched batch type
+        if consumer.get_batching_preference().supports_elements:
+          # Pass it elements if we can
+          self.element_consumers.append(consumer)
+        else:
+          # As a last resort, explode and rebatch
+          consumer_batch_converter = consumer.get_input_batch_converter()
+          # This consumer supports batches, it must have a batch converter
+          assert consumer_batch_converter is not None
+          other_batch_consumers[consumer_batch_converter].append(consumer)
+
+    self.other_batch_consumers: Dict[BatchConverter, List[Operation]] = dict(
+        other_batch_consumers)
+
+    self.has_batch_consumers = (
+        self.passthrough_batch_consumers or self.other_batch_consumers)
+    self._batched_elements: List[Any] = []
+
+  def receive(self, windowed_value):
+    # type: (WindowedValue) -> None
+    self.update_counters_start(windowed_value)
+
+    for consumer in self.element_consumers:
+      cython.cast(Operation, consumer).process(windowed_value)
+
+    # TODO: Do this branching when contstructing ConsumerSet
+    if self.has_batch_consumers:
+      self._batched_elements.append(windowed_value)
+
+    self.update_counters_finish()

Review Comment:
   I'm not sure how to resolve this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] codecov[bot] commented on pull request #17384: [WIP][BEAM-14294] Worker changes to support trivial Batched DoFns

Posted by GitBox <gi...@apache.org>.
codecov[bot] commented on PR #17384:
URL: https://github.com/apache/beam/pull/17384#issuecomment-1102961344

   # [Codecov](https://codecov.io/gh/apache/beam/pull/17384?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#17384](https://codecov.io/gh/apache/beam/pull/17384?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (bfcae6a) into [master](https://codecov.io/gh/apache/beam/commit/fa1c6afb1ec25fcb56e3965135e1cb764dbf14f0?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (fa1c6af) will **increase** coverage by `0.08%`.
   > The diff coverage is `82.94%`.
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #17384      +/-   ##
   ==========================================
   + Coverage   74.09%   74.18%   +0.08%     
   ==========================================
     Files         684      686       +2     
     Lines       89332    89936     +604     
   ==========================================
   + Hits        66195    66715     +520     
   - Misses      21977    22061      +84     
     Partials     1160     1160              
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | python | `83.66% <82.94%> (+0.02%)` | :arrow_up: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/17384?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [sdks/python/apache\_beam/runners/common.py](https://codecov.io/gh/apache/beam/pull/17384/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9jb21tb24ucHk=) | `87.69% <65.71%> (-2.44%)` | :arrow_down: |
   | [sdks/python/apache\_beam/transforms/core.py](https://codecov.io/gh/apache/beam/pull/17384/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdHJhbnNmb3Jtcy9jb3JlLnB5) | `92.22% <80.85%> (-0.43%)` | :arrow_down: |
   | [sdks/python/apache\_beam/typehints/batch.py](https://codecov.io/gh/apache/beam/pull/17384/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdHlwZWhpbnRzL2JhdGNoLnB5) | `82.83% <82.83%> (ø)` | |
   | [...ks/python/apache\_beam/runners/worker/operations.py](https://codecov.io/gh/apache/beam/pull/17384/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvb3BlcmF0aW9ucy5weQ==) | `71.51% <87.32%> (+1.50%)` | :arrow_up: |
   | [sdks/python/apache\_beam/utils/windowed\_value.py](https://codecov.io/gh/apache/beam/pull/17384/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdXRpbHMvd2luZG93ZWRfdmFsdWUucHk=) | `93.75% <98.24%> (+1.54%)` | :arrow_up: |
   | [...hon/apache\_beam/runners/worker/bundle\_processor.py](https://codecov.io/gh/apache/beam/pull/17384/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvYnVuZGxlX3Byb2Nlc3Nvci5weQ==) | `93.68% <100.00%> (+0.03%)` | :arrow_up: |
   | [sdks/python/apache\_beam/utils/interactive\_utils.py](https://codecov.io/gh/apache/beam/pull/17384/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdXRpbHMvaW50ZXJhY3RpdmVfdXRpbHMucHk=) | `92.68% <0.00%> (-2.44%)` | :arrow_down: |
   | [...ache\_beam/runners/portability/expansion\_service.py](https://codecov.io/gh/apache/beam/pull/17384/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9wb3J0YWJpbGl0eS9leHBhbnNpb25fc2VydmljZS5weQ==) | `91.83% <0.00%> (-1.35%)` | :arrow_down: |
   | [...che\_beam/runners/interactive/interactive\_runner.py](https://codecov.io/gh/apache/beam/pull/17384/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9pbnRlcmFjdGl2ZS9pbnRlcmFjdGl2ZV9ydW5uZXIucHk=) | `92.25% <0.00%> (-0.71%)` | :arrow_down: |
   | [...apache\_beam/runners/dataflow/internal/apiclient.py](https://codecov.io/gh/apache/beam/pull/17384/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9kYXRhZmxvdy9pbnRlcm5hbC9hcGljbGllbnQucHk=) | `77.36% <0.00%> (-0.21%)` | :arrow_down: |
   | ... and [14 more](https://codecov.io/gh/apache/beam/pull/17384/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/17384?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/17384?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [fa1c6af...bfcae6a](https://codecov.io/gh/apache/beam/pull/17384?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] robertwb commented on pull request #17384: [BEAM-14294] Worker changes to support trivial Batched DoFns

Posted by GitBox <gi...@apache.org>.
robertwb commented on PR #17384:
URL: https://github.com/apache/beam/pull/17384#issuecomment-1117817787

   If the flake is unrelated to batch, sounds fine to skip it to me (but please file a jira). 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] TheNeuralBit commented on a diff in pull request #17384: [BEAM-14294] Worker changes to support trivial Batched DoFns

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on code in PR #17384:
URL: https://github.com/apache/beam/pull/17384#discussion_r863276113


##########
sdks/python/apache_beam/utils/windowed_value.pxd:
##########
@@ -43,6 +43,23 @@ cdef class WindowedValue(object):
 
   cpdef WindowedValue with_value(self, new_value)
 
+cdef class WindowedBatch(object):
+  cpdef WindowedBatch with_values(self, object new_values)
+
+cdef class HomogeneousWindowedBatch(WindowedBatch):
+  cdef public WindowedValue _wv

Review Comment:
   Without it the equality checks in windowed_value_test fail, e.g.:
   ```
   __________________________ WindowedBatchTest.test_homogeneous_windowed_batch_with_values ___________________________
   
   self = <apache_beam.utils.windowed_value_test.WindowedBatchTest testMethod=test_homogeneous_windowed_batch_with_values>
   
       def test_homogeneous_windowed_batch_with_values(self):
         pane_info = windowed_value.PaneInfo(
             True, True, windowed_value.PaneInfoTiming.ON_TIME, 0, 0)
         wb = windowed_value.HomogeneousWindowedBatch.of(['foo', 'bar'],
                                                         6, (),
                                                         pane_info)
   >     self.assertEqual(
             wb.with_values(['baz', 'foo']),
             windowed_value.HomogeneousWindowedBatch.of(['baz', 'foo'],
                                                        6, (),
                                                        pane_info))
   
   apache_beam/utils/windowed_value_test.py:145: 
   _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
   
   >   return self._wv == other._wv
   E   AttributeError: 'apache_beam.utils.windowed_value.HomogeneousWindow' object has no attribute '_wv'
   ```
   
   I'm certainly open to a better solution here. I'm not sure why that's happening - is it because of the reference to `other._wv`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] TheNeuralBit commented on a diff in pull request #17384: [BEAM-14294] Worker changes to support trivial Batched DoFns

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on code in PR #17384:
URL: https://github.com/apache/beam/pull/17384#discussion_r864050411


##########
sdks/python/apache_beam/runners/worker/operations.py:
##########
@@ -223,6 +235,120 @@ def current_element_progress(self):
     return self.consumer.current_element_progress()
 
 
+class GeneralPurposeConsumerSet(ConsumerSet):
+  """ConsumerSet implementation that handles all combinations of possible edges.
+  """
+  def __init__(self,
+               counter_factory,
+               step_name,  # type: str
+               output_index,
+               coder,
+               producer_type_hints,
+               consumers,  # type: List[Operation]
+               producer_batch_converter):
+    super().__init__(
+        counter_factory,
+        step_name,
+        output_index,
+        consumers,
+        coder,
+        producer_type_hints)
+
+    self.producer_batch_converter = producer_batch_converter
+
+    # Partition consumers into three groups:
+    # - consumers that will be passed elements
+    # - consumers that will be passed batches (where their input batch type
+    #   matches the output of the producer)
+    # - consumers that will be passed converted batches
+    self.element_consumers: List[Operation] = []
+    self.passthrough_batch_consumers: List[Operation] = []
+    other_batch_consumers: DefaultDict[
+        BatchConverter, List[Operation]] = collections.defaultdict(lambda: [])
+
+    for consumer in consumers:
+      if not consumer.get_batching_preference().supports_batches:
+        self.element_consumers.append(consumer)
+      elif (consumer.get_input_batch_converter() ==
+            self.producer_batch_converter):
+        self.passthrough_batch_consumers.append(consumer)
+      else:
+        # Batch consumer with a mismatched batch type
+        if consumer.get_batching_preference().supports_elements:
+          # Pass it elements if we can
+          self.element_consumers.append(consumer)
+        else:
+          # As a last resort, explode and rebatch
+          consumer_batch_converter = consumer.get_input_batch_converter()
+          # This consumer supports batches, it must have a batch converter
+          assert consumer_batch_converter is not None
+          other_batch_consumers[consumer_batch_converter].append(consumer)
+
+    self.other_batch_consumers: Dict[BatchConverter, List[Operation]] = dict(
+        other_batch_consumers)
+
+    self.has_batch_consumers = (
+        self.passthrough_batch_consumers or self.other_batch_consumers)
+    self._batched_elements: List[Any] = []
+
+  def receive(self, windowed_value):
+    # type: (WindowedValue) -> None
+    self.update_counters_start(windowed_value)
+
+    for consumer in self.element_consumers:
+      cython.cast(Operation, consumer).process(windowed_value)
+
+    # TODO: Do this branching when contstructing ConsumerSet
+    if self.has_batch_consumers:
+      self._batched_elements.append(windowed_value)
+
+    self.update_counters_finish()

Review Comment:
   I'm not sure how to fix this though.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] robertwb commented on a diff in pull request #17384: [BEAM-14294] Worker changes to support trivial Batched DoFns

Posted by GitBox <gi...@apache.org>.
robertwb commented on code in PR #17384:
URL: https://github.com/apache/beam/pull/17384#discussion_r864277512


##########
sdks/python/apache_beam/transforms/combiners.py:
##########
@@ -88,7 +90,7 @@ def expand(self, pcoll):
 
 # TODO(laolu): This type signature is overly restrictive. This should be
 # more general.
-@with_input_types(Union[float, int])
+@with_input_types(Union[float, int, np.int64, np.float64])

Review Comment:
   Yes, this is fine. 



##########
sdks/python/apache_beam/runners/common.py:
##########
@@ -1361,10 +1580,80 @@ def process_outputs(
         self.main_receivers.receive(windowed_value)
       else:
         self.tagged_receivers[tag].receive(windowed_value)
+
+    # TODO(BEAM-3937): Remove if block after output counter released.
+    # Only enable per_element_output_counter when counter cythonized
+    if self.per_element_output_counter is not None:
+      self.per_element_output_counter.add_input(output_element_count)
+
+  def process_batch_outputs(
+      self, windowed_input_batch, results, watermark_estimator=None):
+    # type: (WindowedValue, Iterable[Any], Optional[WatermarkEstimator]) -> None
+
+    """Dispatch the result of process computation to the appropriate receivers.
+
+    A value wrapped in a TaggedOutput object will be unwrapped and
+    then dispatched to the appropriate indexed output.
+    """
+    if results is None:
+      # TODO(BEAM-3937): Remove if block after output counter released.
+      # Only enable per_element_output_counter when counter cythonized.
+      if self.per_element_output_counter is not None:
+        self.per_element_output_counter.add_input(0)
+      return
+
+    # TODO(BEAM-10782): Verify that the results object is a valid iterable type
+    #  if performance_runtime_type_check is active, without harming performance
+
+    output_element_count = 0
+    for result in results:
+      # results here may be a generator, which cannot call len on it.
+      output_element_count += 1
+      tag = None
+      if isinstance(result, TaggedOutput):
+        tag = result.tag
+        if not isinstance(tag, str):
+          raise TypeError('In %s, tag %s is not a string' % (self, tag))
+        result = result.value
+      if isinstance(result, (WindowedValue, TimestampedValue)):
+        raise TypeError(
+            f"Received {type(result).__name__} from DoFn that was "
+            "expected to produce a batch.")
+      if isinstance(result, WindowedBatch):
+        if isinstance(result, ConcreteWindowedBatch):
+          # TODO: Rebatch into homogenous batches (or remove
+          # ConcreteWindowedBatch)
+          raise NotImplementedError
+        elif isinstance(result, HomogeneousWindowedBatch):
+          windowed_batch = result
+        else:
+          raise AssertionError(
+              "Unrecognized WindowedBatch implementation: "
+              f"{type(windowed_batch)}")
+
+        if (windowed_input_batch is not None and
+            len(windowed_input_batch.windows) != 1):
+          windowed_batch.windows *= len(windowed_input_batch.windows)

Review Comment:
   +1



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] TheNeuralBit commented on a diff in pull request #17384: [BEAM-14294] Worker changes to support trivial Batched DoFns

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on code in PR #17384:
URL: https://github.com/apache/beam/pull/17384#discussion_r863281360


##########
sdks/python/apache_beam/utils/windowed_value.py:
##########
@@ -279,6 +293,208 @@ def create(value, timestamp_micros, windows, pane_info=PANE_INFO_UNKNOWN):
   return wv
 
 
+class BatchingMode(Enum):
+  CONCRETE = 1
+  HOMOGENEOUS = 2
+
+
+class WindowedBatch(object):
+  """A batch of N windowed values, each having a value, a timestamp and set of
+  windows."""
+  def with_values(self, new_values):
+    # type: (Any) -> WindowedBatch
+
+    """Creates a new WindowedBatch with the same timestamps and windows as this.
+
+    This is the fasted way to create a new WindowedValue.
+    """
+    raise NotImplementedError
+
+  def as_windowed_values(self, explode_fn: Callable) -> Iterable[WindowedValue]:
+    raise NotImplementedError
+
+  @staticmethod
+  def from_windowed_values(
+      windowed_values: Sequence[WindowedValue],
+      *,
+      produce_fn: Callable,
+      mode: BatchingMode = BatchingMode.CONCRETE) -> Iterable['WindowedBatch']:
+    if mode == BatchingMode.HOMOGENEOUS:
+      import collections
+      grouped = collections.defaultdict(lambda: [])
+      for wv in windowed_values:
+        grouped[(wv.timestamp, tuple(wv.windows),

Review Comment:
   great idea, done!
   
   To make this work I also had to update WindowedValue.__hash__ to coerce windows to a tuple



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] TheNeuralBit commented on a diff in pull request #17384: [BEAM-14294] Worker changes to support trivial Batched DoFns

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on code in PR #17384:
URL: https://github.com/apache/beam/pull/17384#discussion_r863281360


##########
sdks/python/apache_beam/utils/windowed_value.py:
##########
@@ -279,6 +293,208 @@ def create(value, timestamp_micros, windows, pane_info=PANE_INFO_UNKNOWN):
   return wv
 
 
+class BatchingMode(Enum):
+  CONCRETE = 1
+  HOMOGENEOUS = 2
+
+
+class WindowedBatch(object):
+  """A batch of N windowed values, each having a value, a timestamp and set of
+  windows."""
+  def with_values(self, new_values):
+    # type: (Any) -> WindowedBatch
+
+    """Creates a new WindowedBatch with the same timestamps and windows as this.
+
+    This is the fasted way to create a new WindowedValue.
+    """
+    raise NotImplementedError
+
+  def as_windowed_values(self, explode_fn: Callable) -> Iterable[WindowedValue]:
+    raise NotImplementedError
+
+  @staticmethod
+  def from_windowed_values(
+      windowed_values: Sequence[WindowedValue],
+      *,
+      produce_fn: Callable,
+      mode: BatchingMode = BatchingMode.CONCRETE) -> Iterable['WindowedBatch']:
+    if mode == BatchingMode.HOMOGENEOUS:
+      import collections
+      grouped = collections.defaultdict(lambda: [])
+      for wv in windowed_values:
+        grouped[(wv.timestamp, tuple(wv.windows),

Review Comment:
   great idea, done!
   
   To make this work I also had to update `WindowedValue.__hash__` to coerce windows to a tuple



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] TheNeuralBit commented on a diff in pull request #17384: [BEAM-14294] Worker changes to support trivial Batched DoFns

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on code in PR #17384:
URL: https://github.com/apache/beam/pull/17384#discussion_r863280259


##########
sdks/python/apache_beam/utils/windowed_value.py:
##########
@@ -279,6 +293,208 @@ def create(value, timestamp_micros, windows, pane_info=PANE_INFO_UNKNOWN):
   return wv
 
 
+class BatchingMode(Enum):
+  CONCRETE = 1
+  HOMOGENEOUS = 2
+
+
+class WindowedBatch(object):
+  """A batch of N windowed values, each having a value, a timestamp and set of
+  windows."""
+  def with_values(self, new_values):
+    # type: (Any) -> WindowedBatch
+
+    """Creates a new WindowedBatch with the same timestamps and windows as this.
+
+    This is the fasted way to create a new WindowedValue.
+    """
+    raise NotImplementedError
+
+  def as_windowed_values(self, explode_fn: Callable) -> Iterable[WindowedValue]:
+    raise NotImplementedError
+
+  @staticmethod
+  def from_windowed_values(
+      windowed_values: Sequence[WindowedValue],
+      *,
+      produce_fn: Callable,
+      mode: BatchingMode = BatchingMode.CONCRETE) -> Iterable['WindowedBatch']:
+    if mode == BatchingMode.HOMOGENEOUS:
+      import collections
+      grouped = collections.defaultdict(lambda: [])
+      for wv in windowed_values:
+        grouped[(wv.timestamp, tuple(wv.windows),
+                 wv.pane_info)].append(wv.value)
+
+      for key, values in grouped.items():
+        timestamp, windows, pane_info = key
+        yield HomogeneousWindowedBatch.of(
+            produce_fn(values), timestamp, windows, pane_info)
+    elif mode == BatchingMode.CONCRETE:
+      yield ConcreteWindowedBatch(
+          produce_fn([wv.value for wv in windowed_values]),
+          [wv.timestamp
+           for wv in windowed_values], [wv.windows for wv in windowed_values],
+          [wv.pane_info for wv in windowed_values])
+    else:
+      raise AssertionError(
+          "Unrecognized BatchingMode in "
+          f"WindowedBatch.from_windowed_values: {mode!r}")
+
+
+class HomogeneousWindowedBatch(WindowedBatch):
+  """A WindowedBatch with Homogeneous event-time information, represented
+  internally as a WindowedValue.
+  """
+  def __init__(self, wv):
+    self._wv = wv
+
+  @staticmethod
+  def of(values, timestamp, windows, pane_info=PANE_INFO_UNKNOWN):
+    return HomogeneousWindowedBatch(
+        WindowedValue(values, timestamp, windows, pane_info))
+
+  @property
+  def values(self):
+    return self._wv.value
+
+  @property
+  def timestamp(self):
+    return self._wv.timestamp
+
+  @property
+  def pane_info(self):
+    return self._wv.pane_info
+
+  @property
+  def windows(self):
+    return self._wv.windows
+
+  @windows.setter
+  def windows(self, value):
+    self._wv.windows = value
+
+  def with_values(self, new_values):
+    # type: (Any) -> WindowedBatch
+    return HomogeneousWindowedBatch(self._wv.with_value(new_values))
+
+  def as_windowed_values(self, explode_fn: Callable) -> Iterable[WindowedValue]:
+    for value in explode_fn(self._wv.value):
+      yield self._wv.with_value(value)
+
+  def __eq__(self, other):
+    if isinstance(other, HomogeneousWindowedBatch):
+      return self._wv == other._wv
+    return NotImplemented
+
+  def __hash__(self):
+    return hash(self._wv)
+
+
+class ConcreteWindowedBatch(WindowedBatch):
+  """A concrete WindowedBatch where all event-time information is stored
+  independently for each element.
+
+  Attributes:
+    values: The underlying values of the windowed batch.
+    timestamp: An iterable of timestamps associated with the value as seconds
+      since Unix epoch.
+    windows: An iterable with a set (iterable) of window objects for each value.
+      The window objects are descendants of the BoundedWindow class.
+    pane_info: An iterable of PaneInfo descriptors describing the triggering
+      information for the pane that contained each value. Alternatively, a
+      single PaneInfo may be specified to use for every value. If None, will be
+      set to PANE_INFO_UNKNOWN.
+  """
+  def __init__(
+      self,
+      values,
+      timestamps,  # type: Sequence[TimestampTypes]
+      windows,  # type: Iterable[Tuple[BoundedWindow, ...]]
+      pane_infos=PANE_INFO_UNKNOWN  # type: Union[Iterable[PaneInfo],PaneInfo]

Review Comment:
   Sorry which suggestion above?
   
   I went ahead and removed all the defaults for pane_infos in the WindowedBatch implementations



##########
sdks/python/apache_beam/utils/windowed_value.py:
##########
@@ -279,6 +293,208 @@ def create(value, timestamp_micros, windows, pane_info=PANE_INFO_UNKNOWN):
   return wv
 
 
+class BatchingMode(Enum):
+  CONCRETE = 1
+  HOMOGENEOUS = 2
+
+
+class WindowedBatch(object):
+  """A batch of N windowed values, each having a value, a timestamp and set of
+  windows."""
+  def with_values(self, new_values):
+    # type: (Any) -> WindowedBatch
+
+    """Creates a new WindowedBatch with the same timestamps and windows as this.
+
+    This is the fasted way to create a new WindowedValue.
+    """
+    raise NotImplementedError
+
+  def as_windowed_values(self, explode_fn: Callable) -> Iterable[WindowedValue]:
+    raise NotImplementedError
+
+  @staticmethod
+  def from_windowed_values(
+      windowed_values: Sequence[WindowedValue],
+      *,
+      produce_fn: Callable,
+      mode: BatchingMode = BatchingMode.CONCRETE) -> Iterable['WindowedBatch']:
+    if mode == BatchingMode.HOMOGENEOUS:
+      import collections
+      grouped = collections.defaultdict(lambda: [])
+      for wv in windowed_values:
+        grouped[(wv.timestamp, tuple(wv.windows),
+                 wv.pane_info)].append(wv.value)
+
+      for key, values in grouped.items():
+        timestamp, windows, pane_info = key
+        yield HomogeneousWindowedBatch.of(
+            produce_fn(values), timestamp, windows, pane_info)
+    elif mode == BatchingMode.CONCRETE:
+      yield ConcreteWindowedBatch(
+          produce_fn([wv.value for wv in windowed_values]),
+          [wv.timestamp
+           for wv in windowed_values], [wv.windows for wv in windowed_values],
+          [wv.pane_info for wv in windowed_values])
+    else:
+      raise AssertionError(
+          "Unrecognized BatchingMode in "
+          f"WindowedBatch.from_windowed_values: {mode!r}")
+
+
+class HomogeneousWindowedBatch(WindowedBatch):
+  """A WindowedBatch with Homogeneous event-time information, represented
+  internally as a WindowedValue.
+  """
+  def __init__(self, wv):
+    self._wv = wv
+
+  @staticmethod
+  def of(values, timestamp, windows, pane_info=PANE_INFO_UNKNOWN):
+    return HomogeneousWindowedBatch(
+        WindowedValue(values, timestamp, windows, pane_info))
+
+  @property
+  def values(self):
+    return self._wv.value
+
+  @property
+  def timestamp(self):
+    return self._wv.timestamp
+
+  @property
+  def pane_info(self):
+    return self._wv.pane_info
+
+  @property
+  def windows(self):
+    return self._wv.windows
+
+  @windows.setter
+  def windows(self, value):
+    self._wv.windows = value
+
+  def with_values(self, new_values):
+    # type: (Any) -> WindowedBatch
+    return HomogeneousWindowedBatch(self._wv.with_value(new_values))
+
+  def as_windowed_values(self, explode_fn: Callable) -> Iterable[WindowedValue]:
+    for value in explode_fn(self._wv.value):
+      yield self._wv.with_value(value)
+
+  def __eq__(self, other):
+    if isinstance(other, HomogeneousWindowedBatch):
+      return self._wv == other._wv
+    return NotImplemented
+
+  def __hash__(self):
+    return hash(self._wv)
+
+
+class ConcreteWindowedBatch(WindowedBatch):
+  """A concrete WindowedBatch where all event-time information is stored
+  independently for each element.
+
+  Attributes:
+    values: The underlying values of the windowed batch.
+    timestamp: An iterable of timestamps associated with the value as seconds
+      since Unix epoch.
+    windows: An iterable with a set (iterable) of window objects for each value.
+      The window objects are descendants of the BoundedWindow class.
+    pane_info: An iterable of PaneInfo descriptors describing the triggering
+      information for the pane that contained each value. Alternatively, a
+      single PaneInfo may be specified to use for every value. If None, will be
+      set to PANE_INFO_UNKNOWN.
+  """
+  def __init__(
+      self,
+      values,
+      timestamps,  # type: Sequence[TimestampTypes]
+      windows,  # type: Iterable[Tuple[BoundedWindow, ...]]
+      pane_infos=PANE_INFO_UNKNOWN  # type: Union[Iterable[PaneInfo],PaneInfo]
+  ):
+    self.values = values
+
+    def convert_timestamp(timestamp: TimestampTypes) -> int:
+      if isinstance(timestamp, int):
+        return timestamp * 1000000
+      else:
+        # TODO: Cache Timestamp object as in WindowedValue?
+        timestamp_object = (
+            timestamp
+            if isinstance(timestamp, Timestamp) else Timestamp.of(timestamp))
+        return timestamp_object.micros
+
+    self.timestamp_objects: Optional[List[Timestamp]] = None
+    self.timestamps_micros = [convert_timestamp(t) for t in timestamps]
+    self.windows = windows
+    #TODO: Should we store length?
+    #self.length = length
+    self.pane_infos = pane_infos
+
+  @property
+  def timestamps(self) -> Sequence[Timestamp]:
+    if self.timestamp_objects is None:
+      self.timestamp_objects = [
+          Timestamp(0, micros) for micros in self.timestamps_micros
+      ]
+
+    return self.timestamp_objects
+
+  def with_values(self, new_values):
+    # type: (Any) -> WindowedBatch
+
+    """Creates a new WindowedBatch with the same timestamps and windows as this.
+
+    This is the fasted way to create a new WindowedValue.
+    """
+    return create_batch(
+        new_values, self.timestamps_micros, self.windows, self.pane_infos)
+
+  def as_windowed_values(self, explode_fn: Callable) -> Iterable[WindowedValue]:
+    for value, timestamp, windows, pane_info in zip(explode_fn(self.values),
+                                                    self.timestamps_micros,
+                                                    self.windows,
+                                                    self._pane_infos_iter()):
+      yield create(value, timestamp, windows, pane_info)
+
+  def _pane_infos_iter(self):
+    if isinstance(self.pane_infos, PaneInfo):
+      return itertools.repeat(self.pane_infos, len(self.timestamps_micros))
+    else:
+      return self.pane_infos
+
+  def __eq__(self, other):
+    if isinstance(other, ConcreteWindowedBatch):
+      return (
+          type(self) == type(other) and
+          self.timestamps_micros == other.timestamps_micros and
+          self.values == other.values and self.windows == other.windows and
+          self.pane_infos == other.pane_infos)
+    return NotImplemented
+
+  def __hash__(self):
+    if isinstance(self.pane_infos, PaneInfo):
+      pane_infos_hash = hash(self.pane_infos)
+    else:
+      pane_infos_hash = sum(hash(p) for p in self.pane_infos)
+
+    return ((hash(self.values) & 0xFFFFFFFFFFFFFFF) + 3 *
+            (sum(self.timestamps_micros) & 0xFFFFFFFFFFFFFF) + 7 *
+            (sum(hash(w) for w in self.windows) & 0xFFFFFFFFFFFFF) + 11 *
+            (pane_infos_hash & 0xFFFFFFFFFFFFF))
+
+
+def create_batch(

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] robertwb commented on a diff in pull request #17384: [BEAM-14294] Worker changes to support trivial Batched DoFns

Posted by GitBox <gi...@apache.org>.
robertwb commented on code in PR #17384:
URL: https://github.com/apache/beam/pull/17384#discussion_r863163035


##########
sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:
##########
@@ -121,6 +126,148 @@ def test_pardo(self):
           | beam.Map(lambda e: e + 'x'))
       assert_that(res, equal_to(['aax', 'bcbcx']))
 
+  def test_batch_pardo(self):
+    with self.create_pipeline() as p:
+      res = (
+          p
+          | beam.Create(np.array([1, 2, 3], dtype=np.int64)).with_output_types(
+              np.int64)
+          | beam.ParDo(ArrayMultiplyDoFn())
+          | beam.Map(lambda x: x * 3))
+
+      assert_that(res, equal_to([6, 12, 18]))
+
+  def test_batch_rebatch_pardos(self):
+    with self.create_pipeline() as p:
+      res = (
+          p
+          | beam.Create(np.array([1, 2, 3], dtype=np.int64)).with_output_types(
+              np.int64)
+          | beam.ParDo(ArrayMultiplyDoFn())
+          | beam.ParDo(ListPlusOneDoFn())
+          | beam.Map(lambda x: x * 3))
+
+      assert_that(res, equal_to([9, 15, 21]))
+
+  def test_batch_pardo_fusion_break(self):
+    class NormalizeDoFn(beam.DoFn):
+      @no_type_check
+      def process_batch(
+          self,
+          batch: np.ndarray,
+          mean: np.float64,
+      ) -> Iterator[np.ndarray]:
+        assert isinstance(batch, np.ndarray)
+        yield batch - mean
+
+      # infer_output_type must be defined (when there's no process method),

Review Comment:
   Does it not just fall back to Any? 



##########
sdks/python/apache_beam/runners/worker/operations.py:
##########
@@ -223,6 +235,120 @@ def current_element_progress(self):
     return self.consumer.current_element_progress()
 
 
+class GeneralPurposeConsumerSet(ConsumerSet):
+  """ConsumerSet implementation that handles all combinations of possible edges.
+  """
+  def __init__(self,
+               counter_factory,
+               step_name,  # type: str
+               output_index,
+               coder,
+               producer_type_hints,
+               consumers,  # type: List[Operation]
+               producer_batch_converter):
+    super().__init__(
+        counter_factory,
+        step_name,
+        output_index,
+        consumers,
+        coder,
+        producer_type_hints)
+
+    self.producer_batch_converter = producer_batch_converter
+
+    # Partition consumers into three groups:
+    # - consumers that will be passed elements
+    # - consumers that will be passed batches (where their input batch type
+    #   matches the output of the producer)
+    # - consumers that will be passed converted batches
+    self.element_consumers: List[Operation] = []
+    self.passthrough_batch_consumers: List[Operation] = []
+    other_batch_consumers: DefaultDict[
+        BatchConverter, List[Operation]] = collections.defaultdict(lambda: [])
+
+    for consumer in consumers:
+      if not consumer.get_batching_preference().supports_batches:
+        self.element_consumers.append(consumer)
+      elif (consumer.get_input_batch_converter() ==
+            self.producer_batch_converter):
+        self.passthrough_batch_consumers.append(consumer)
+      else:
+        # Batch consumer with a mismatched batch type
+        if consumer.get_batching_preference().supports_elements:
+          # Pass it elements if we can
+          self.element_consumers.append(consumer)
+        else:
+          # As a last resort, explode and rebatch
+          consumer_batch_converter = consumer.get_input_batch_converter()
+          # This consumer supports batches, it must have a batch converter
+          assert consumer_batch_converter is not None
+          other_batch_consumers[consumer_batch_converter].append(consumer)
+
+    self.other_batch_consumers: Dict[BatchConverter, List[Operation]] = dict(
+        other_batch_consumers)
+
+    self.has_batch_consumers = (
+        self.passthrough_batch_consumers or self.other_batch_consumers)
+    self._batched_elements: List[Any] = []
+
+  def receive(self, windowed_value):
+    # type: (WindowedValue) -> None
+    self.update_counters_start(windowed_value)
+
+    for consumer in self.element_consumers:
+      cython.cast(Operation, consumer).process(windowed_value)
+
+    # TODO: Do this branching when contstructing ConsumerSet
+    if self.has_batch_consumers:
+      self._batched_elements.append(windowed_value)
+
+    self.update_counters_finish()

Review Comment:
   This is not going to properly record the sizes of lazily observed values that were (only?) placed into a batch.



##########
sdks/python/apache_beam/runners/worker/operations.py:
##########
@@ -223,6 +235,120 @@ def current_element_progress(self):
     return self.consumer.current_element_progress()
 
 
+class GeneralPurposeConsumerSet(ConsumerSet):
+  """ConsumerSet implementation that handles all combinations of possible edges.
+  """
+  def __init__(self,
+               counter_factory,
+               step_name,  # type: str
+               output_index,
+               coder,
+               producer_type_hints,
+               consumers,  # type: List[Operation]
+               producer_batch_converter):
+    super().__init__(
+        counter_factory,
+        step_name,
+        output_index,
+        consumers,
+        coder,
+        producer_type_hints)
+
+    self.producer_batch_converter = producer_batch_converter
+
+    # Partition consumers into three groups:
+    # - consumers that will be passed elements
+    # - consumers that will be passed batches (where their input batch type
+    #   matches the output of the producer)
+    # - consumers that will be passed converted batches
+    self.element_consumers: List[Operation] = []
+    self.passthrough_batch_consumers: List[Operation] = []
+    other_batch_consumers: DefaultDict[
+        BatchConverter, List[Operation]] = collections.defaultdict(lambda: [])
+
+    for consumer in consumers:
+      if not consumer.get_batching_preference().supports_batches:
+        self.element_consumers.append(consumer)
+      elif (consumer.get_input_batch_converter() ==
+            self.producer_batch_converter):
+        self.passthrough_batch_consumers.append(consumer)
+      else:
+        # Batch consumer with a mismatched batch type
+        if consumer.get_batching_preference().supports_elements:
+          # Pass it elements if we can
+          self.element_consumers.append(consumer)
+        else:
+          # As a last resort, explode and rebatch
+          consumer_batch_converter = consumer.get_input_batch_converter()
+          # This consumer supports batches, it must have a batch converter
+          assert consumer_batch_converter is not None
+          other_batch_consumers[consumer_batch_converter].append(consumer)
+
+    self.other_batch_consumers: Dict[BatchConverter, List[Operation]] = dict(
+        other_batch_consumers)
+
+    self.has_batch_consumers = (
+        self.passthrough_batch_consumers or self.other_batch_consumers)
+    self._batched_elements: List[Any] = []
+
+  def receive(self, windowed_value):
+    # type: (WindowedValue) -> None
+    self.update_counters_start(windowed_value)
+
+    for consumer in self.element_consumers:
+      cython.cast(Operation, consumer).process(windowed_value)
+
+    # TODO: Do this branching when contstructing ConsumerSet
+    if self.has_batch_consumers:
+      self._batched_elements.append(windowed_value)

Review Comment:
   We should put some limit here on the batch size, even if it's a fixed constant.



##########
sdks/python/apache_beam/utils/windowed_value.pxd:
##########
@@ -43,6 +43,23 @@ cdef class WindowedValue(object):
 
   cpdef WindowedValue with_value(self, new_value)
 
+cdef class WindowedBatch(object):
+  cpdef WindowedBatch with_values(self, object new_values)
+
+cdef class HomogeneousWindowedBatch(WindowedBatch):
+  cdef public WindowedValue _wv

Review Comment:
   Why is this public?



##########
sdks/python/apache_beam/runners/worker/operations.py:
##########
@@ -223,6 +235,120 @@ def current_element_progress(self):
     return self.consumer.current_element_progress()
 
 
+class GeneralPurposeConsumerSet(ConsumerSet):
+  """ConsumerSet implementation that handles all combinations of possible edges.
+  """
+  def __init__(self,
+               counter_factory,
+               step_name,  # type: str
+               output_index,
+               coder,
+               producer_type_hints,
+               consumers,  # type: List[Operation]
+               producer_batch_converter):
+    super().__init__(
+        counter_factory,
+        step_name,
+        output_index,
+        consumers,
+        coder,
+        producer_type_hints)
+
+    self.producer_batch_converter = producer_batch_converter
+
+    # Partition consumers into three groups:
+    # - consumers that will be passed elements
+    # - consumers that will be passed batches (where their input batch type
+    #   matches the output of the producer)
+    # - consumers that will be passed converted batches
+    self.element_consumers: List[Operation] = []
+    self.passthrough_batch_consumers: List[Operation] = []
+    other_batch_consumers: DefaultDict[
+        BatchConverter, List[Operation]] = collections.defaultdict(lambda: [])
+
+    for consumer in consumers:
+      if not consumer.get_batching_preference().supports_batches:
+        self.element_consumers.append(consumer)
+      elif (consumer.get_input_batch_converter() ==
+            self.producer_batch_converter):
+        self.passthrough_batch_consumers.append(consumer)
+      else:
+        # Batch consumer with a mismatched batch type
+        if consumer.get_batching_preference().supports_elements:
+          # Pass it elements if we can
+          self.element_consumers.append(consumer)
+        else:
+          # As a last resort, explode and rebatch
+          consumer_batch_converter = consumer.get_input_batch_converter()
+          # This consumer supports batches, it must have a batch converter
+          assert consumer_batch_converter is not None
+          other_batch_consumers[consumer_batch_converter].append(consumer)
+
+    self.other_batch_consumers: Dict[BatchConverter, List[Operation]] = dict(
+        other_batch_consumers)
+
+    self.has_batch_consumers = (
+        self.passthrough_batch_consumers or self.other_batch_consumers)
+    self._batched_elements: List[Any] = []
+
+  def receive(self, windowed_value):
+    # type: (WindowedValue) -> None
+    self.update_counters_start(windowed_value)
+
+    for consumer in self.element_consumers:
+      cython.cast(Operation, consumer).process(windowed_value)
+
+    # TODO: Do this branching when contstructing ConsumerSet
+    if self.has_batch_consumers:
+      self._batched_elements.append(windowed_value)
+
+    self.update_counters_finish()
+
+  def receive_batch(self, windowed_batch):
+    #self.update_counters_start(windowed_value)

Review Comment:
   At least increment the element counter by the batch size and drop a TODO about getting the bytes (from the batch converter(?)). 



##########
sdks/python/apache_beam/runners/worker/operations.py:
##########
@@ -223,6 +235,120 @@ def current_element_progress(self):
     return self.consumer.current_element_progress()
 
 
+class GeneralPurposeConsumerSet(ConsumerSet):
+  """ConsumerSet implementation that handles all combinations of possible edges.
+  """
+  def __init__(self,
+               counter_factory,
+               step_name,  # type: str
+               output_index,
+               coder,
+               producer_type_hints,
+               consumers,  # type: List[Operation]
+               producer_batch_converter):
+    super().__init__(
+        counter_factory,
+        step_name,
+        output_index,
+        consumers,
+        coder,
+        producer_type_hints)
+
+    self.producer_batch_converter = producer_batch_converter
+
+    # Partition consumers into three groups:
+    # - consumers that will be passed elements
+    # - consumers that will be passed batches (where their input batch type
+    #   matches the output of the producer)
+    # - consumers that will be passed converted batches
+    self.element_consumers: List[Operation] = []
+    self.passthrough_batch_consumers: List[Operation] = []
+    other_batch_consumers: DefaultDict[
+        BatchConverter, List[Operation]] = collections.defaultdict(lambda: [])
+
+    for consumer in consumers:
+      if not consumer.get_batching_preference().supports_batches:
+        self.element_consumers.append(consumer)
+      elif (consumer.get_input_batch_converter() ==
+            self.producer_batch_converter):
+        self.passthrough_batch_consumers.append(consumer)
+      else:
+        # Batch consumer with a mismatched batch type
+        if consumer.get_batching_preference().supports_elements:
+          # Pass it elements if we can
+          self.element_consumers.append(consumer)
+        else:
+          # As a last resort, explode and rebatch
+          consumer_batch_converter = consumer.get_input_batch_converter()
+          # This consumer supports batches, it must have a batch converter
+          assert consumer_batch_converter is not None
+          other_batch_consumers[consumer_batch_converter].append(consumer)
+
+    self.other_batch_consumers: Dict[BatchConverter, List[Operation]] = dict(
+        other_batch_consumers)
+
+    self.has_batch_consumers = (
+        self.passthrough_batch_consumers or self.other_batch_consumers)
+    self._batched_elements: List[Any] = []
+
+  def receive(self, windowed_value):
+    # type: (WindowedValue) -> None
+    self.update_counters_start(windowed_value)
+
+    for consumer in self.element_consumers:
+      cython.cast(Operation, consumer).process(windowed_value)
+
+    # TODO: Do this branching when contstructing ConsumerSet
+    if self.has_batch_consumers:
+      self._batched_elements.append(windowed_value)
+
+    self.update_counters_finish()
+
+  def receive_batch(self, windowed_batch):
+    #self.update_counters_start(windowed_value)
+    if self.element_consumers:
+      for wv in windowed_batch.as_windowed_values(
+          self.producer_batch_converter.explode_batch):
+        for consumer in self.element_consumers:
+          cython.cast(Operation, consumer).process(wv)
+
+    for consumer in self.passthrough_batch_consumers:
+      cython.cast(Operation, consumer).process_batch(windowed_batch)
+
+    for (consumer_batch_converter,
+         consumers) in self.other_batch_consumers.items():
+      # Explode and rebatch into the new batch type (ouch!)

Review Comment:
   Maybe worth logging a warning at least once?



##########
sdks/python/apache_beam/utils/windowed_value.pxd:
##########
@@ -43,6 +43,23 @@ cdef class WindowedValue(object):
 
   cpdef WindowedValue with_value(self, new_value)
 
+cdef class WindowedBatch(object):
+  cpdef WindowedBatch with_values(self, object new_values)
+
+cdef class HomogeneousWindowedBatch(WindowedBatch):
+  cdef public WindowedValue _wv
+
+  cpdef WindowedBatch with_values(self, object new_values)
+
+cdef class ConcreteWindowedBatch(WindowedBatch):
+  cdef public object values

Review Comment:
   Why not store this as a list of WindowedValues?



##########
sdks/python/apache_beam/utils/windowed_value.pxd:
##########
@@ -43,6 +43,23 @@ cdef class WindowedValue(object):
 
   cpdef WindowedValue with_value(self, new_value)
 
+cdef class WindowedBatch(object):
+  cpdef WindowedBatch with_values(self, object new_values)
+
+cdef class HomogeneousWindowedBatch(WindowedBatch):
+  cdef public WindowedValue _wv
+
+  cpdef WindowedBatch with_values(self, object new_values)
+
+cdef class ConcreteWindowedBatch(WindowedBatch):

Review Comment:
   Heterogeneous? 



##########
sdks/python/apache_beam/utils/windowed_value.py:
##########
@@ -279,6 +293,208 @@ def create(value, timestamp_micros, windows, pane_info=PANE_INFO_UNKNOWN):
   return wv
 
 
+class BatchingMode(Enum):
+  CONCRETE = 1
+  HOMOGENEOUS = 2
+
+
+class WindowedBatch(object):
+  """A batch of N windowed values, each having a value, a timestamp and set of
+  windows."""
+  def with_values(self, new_values):
+    # type: (Any) -> WindowedBatch
+
+    """Creates a new WindowedBatch with the same timestamps and windows as this.
+
+    This is the fasted way to create a new WindowedValue.
+    """
+    raise NotImplementedError
+
+  def as_windowed_values(self, explode_fn: Callable) -> Iterable[WindowedValue]:
+    raise NotImplementedError
+
+  @staticmethod
+  def from_windowed_values(
+      windowed_values: Sequence[WindowedValue],
+      *,
+      produce_fn: Callable,
+      mode: BatchingMode = BatchingMode.CONCRETE) -> Iterable['WindowedBatch']:
+    if mode == BatchingMode.HOMOGENEOUS:
+      import collections
+      grouped = collections.defaultdict(lambda: [])
+      for wv in windowed_values:
+        grouped[(wv.timestamp, tuple(wv.windows),
+                 wv.pane_info)].append(wv.value)
+
+      for key, values in grouped.items():
+        timestamp, windows, pane_info = key
+        yield HomogeneousWindowedBatch.of(
+            produce_fn(values), timestamp, windows, pane_info)
+    elif mode == BatchingMode.CONCRETE:
+      yield ConcreteWindowedBatch(
+          produce_fn([wv.value for wv in windowed_values]),
+          [wv.timestamp
+           for wv in windowed_values], [wv.windows for wv in windowed_values],
+          [wv.pane_info for wv in windowed_values])
+    else:
+      raise AssertionError(
+          "Unrecognized BatchingMode in "
+          f"WindowedBatch.from_windowed_values: {mode!r}")
+
+
+class HomogeneousWindowedBatch(WindowedBatch):
+  """A WindowedBatch with Homogeneous event-time information, represented
+  internally as a WindowedValue.
+  """
+  def __init__(self, wv):
+    self._wv = wv
+
+  @staticmethod
+  def of(values, timestamp, windows, pane_info=PANE_INFO_UNKNOWN):
+    return HomogeneousWindowedBatch(
+        WindowedValue(values, timestamp, windows, pane_info))
+
+  @property
+  def values(self):
+    return self._wv.value
+
+  @property
+  def timestamp(self):
+    return self._wv.timestamp
+
+  @property
+  def pane_info(self):
+    return self._wv.pane_info
+
+  @property
+  def windows(self):
+    return self._wv.windows
+
+  @windows.setter
+  def windows(self, value):
+    self._wv.windows = value
+
+  def with_values(self, new_values):
+    # type: (Any) -> WindowedBatch
+    return HomogeneousWindowedBatch(self._wv.with_value(new_values))
+
+  def as_windowed_values(self, explode_fn: Callable) -> Iterable[WindowedValue]:
+    for value in explode_fn(self._wv.value):
+      yield self._wv.with_value(value)
+
+  def __eq__(self, other):
+    if isinstance(other, HomogeneousWindowedBatch):
+      return self._wv == other._wv
+    return NotImplemented
+
+  def __hash__(self):
+    return hash(self._wv)
+
+
+class ConcreteWindowedBatch(WindowedBatch):
+  """A concrete WindowedBatch where all event-time information is stored
+  independently for each element.
+
+  Attributes:
+    values: The underlying values of the windowed batch.
+    timestamp: An iterable of timestamps associated with the value as seconds
+      since Unix epoch.
+    windows: An iterable with a set (iterable) of window objects for each value.
+      The window objects are descendants of the BoundedWindow class.
+    pane_info: An iterable of PaneInfo descriptors describing the triggering
+      information for the pane that contained each value. Alternatively, a
+      single PaneInfo may be specified to use for every value. If None, will be
+      set to PANE_INFO_UNKNOWN.
+  """
+  def __init__(
+      self,
+      values,
+      timestamps,  # type: Sequence[TimestampTypes]
+      windows,  # type: Iterable[Tuple[BoundedWindow, ...]]
+      pane_infos=PANE_INFO_UNKNOWN  # type: Union[Iterable[PaneInfo],PaneInfo]
+  ):
+    self.values = values
+
+    def convert_timestamp(timestamp: TimestampTypes) -> int:
+      if isinstance(timestamp, int):
+        return timestamp * 1000000
+      else:
+        # TODO: Cache Timestamp object as in WindowedValue?
+        timestamp_object = (
+            timestamp
+            if isinstance(timestamp, Timestamp) else Timestamp.of(timestamp))
+        return timestamp_object.micros
+
+    self.timestamp_objects: Optional[List[Timestamp]] = None
+    self.timestamps_micros = [convert_timestamp(t) for t in timestamps]
+    self.windows = windows
+    #TODO: Should we store length?
+    #self.length = length
+    self.pane_infos = pane_infos
+
+  @property
+  def timestamps(self) -> Sequence[Timestamp]:

Review Comment:
   Is this ever needed/used?



##########
sdks/python/apache_beam/utils/windowed_value.py:
##########
@@ -279,6 +293,208 @@ def create(value, timestamp_micros, windows, pane_info=PANE_INFO_UNKNOWN):
   return wv
 
 
+class BatchingMode(Enum):
+  CONCRETE = 1
+  HOMOGENEOUS = 2
+
+
+class WindowedBatch(object):
+  """A batch of N windowed values, each having a value, a timestamp and set of
+  windows."""
+  def with_values(self, new_values):
+    # type: (Any) -> WindowedBatch
+
+    """Creates a new WindowedBatch with the same timestamps and windows as this.
+
+    This is the fasted way to create a new WindowedValue.
+    """
+    raise NotImplementedError
+
+  def as_windowed_values(self, explode_fn: Callable) -> Iterable[WindowedValue]:
+    raise NotImplementedError
+
+  @staticmethod
+  def from_windowed_values(
+      windowed_values: Sequence[WindowedValue],
+      *,
+      produce_fn: Callable,
+      mode: BatchingMode = BatchingMode.CONCRETE) -> Iterable['WindowedBatch']:
+    if mode == BatchingMode.HOMOGENEOUS:
+      import collections
+      grouped = collections.defaultdict(lambda: [])
+      for wv in windowed_values:
+        grouped[(wv.timestamp, tuple(wv.windows),
+                 wv.pane_info)].append(wv.value)
+
+      for key, values in grouped.items():
+        timestamp, windows, pane_info = key
+        yield HomogeneousWindowedBatch.of(
+            produce_fn(values), timestamp, windows, pane_info)
+    elif mode == BatchingMode.CONCRETE:
+      yield ConcreteWindowedBatch(
+          produce_fn([wv.value for wv in windowed_values]),
+          [wv.timestamp
+           for wv in windowed_values], [wv.windows for wv in windowed_values],
+          [wv.pane_info for wv in windowed_values])
+    else:
+      raise AssertionError(
+          "Unrecognized BatchingMode in "
+          f"WindowedBatch.from_windowed_values: {mode!r}")
+
+
+class HomogeneousWindowedBatch(WindowedBatch):
+  """A WindowedBatch with Homogeneous event-time information, represented
+  internally as a WindowedValue.
+  """
+  def __init__(self, wv):
+    self._wv = wv
+
+  @staticmethod
+  def of(values, timestamp, windows, pane_info=PANE_INFO_UNKNOWN):
+    return HomogeneousWindowedBatch(
+        WindowedValue(values, timestamp, windows, pane_info))
+
+  @property
+  def values(self):
+    return self._wv.value
+
+  @property
+  def timestamp(self):

Review Comment:
   Is this (and pane_info and windows) actually used anywhere?



##########
sdks/python/apache_beam/utils/windowed_value.py:
##########
@@ -279,6 +293,208 @@ def create(value, timestamp_micros, windows, pane_info=PANE_INFO_UNKNOWN):
   return wv
 
 
+class BatchingMode(Enum):
+  CONCRETE = 1
+  HOMOGENEOUS = 2
+
+
+class WindowedBatch(object):
+  """A batch of N windowed values, each having a value, a timestamp and set of
+  windows."""
+  def with_values(self, new_values):
+    # type: (Any) -> WindowedBatch
+
+    """Creates a new WindowedBatch with the same timestamps and windows as this.
+
+    This is the fasted way to create a new WindowedValue.
+    """
+    raise NotImplementedError
+
+  def as_windowed_values(self, explode_fn: Callable) -> Iterable[WindowedValue]:
+    raise NotImplementedError
+
+  @staticmethod
+  def from_windowed_values(
+      windowed_values: Sequence[WindowedValue],
+      *,
+      produce_fn: Callable,
+      mode: BatchingMode = BatchingMode.CONCRETE) -> Iterable['WindowedBatch']:
+    if mode == BatchingMode.HOMOGENEOUS:
+      import collections
+      grouped = collections.defaultdict(lambda: [])
+      for wv in windowed_values:
+        grouped[(wv.timestamp, tuple(wv.windows),

Review Comment:
   Pulling timestamp etc. out as Python objects could be expensive. Could use wv.with_value(None) as the key. 



##########
sdks/python/apache_beam/utils/windowed_value.pxd:
##########
@@ -43,6 +43,23 @@ cdef class WindowedValue(object):
 
   cpdef WindowedValue with_value(self, new_value)
 
+cdef class WindowedBatch(object):
+  cpdef WindowedBatch with_values(self, object new_values)
+
+cdef class HomogeneousWindowedBatch(WindowedBatch):
+  cdef public WindowedValue _wv
+
+  cpdef WindowedBatch with_values(self, object new_values)
+
+cdef class ConcreteWindowedBatch(WindowedBatch):
+  cdef public object values

Review Comment:
   Or, if the Values need to be arbitrary batches, let's store this as (values, List[WindowedvValue] where the latter all have value None. 



##########
sdks/python/apache_beam/runners/worker/operations.py:
##########
@@ -329,7 +432,9 @@ def finish(self):
     # type: () -> None
 
     """Finish operation."""
-    pass
+    # TODO: Do we need an output_index here

Review Comment:
   I don't think so; we should be able to finish everything here. 



##########
sdks/python/apache_beam/utils/windowed_value.py:
##########
@@ -279,6 +293,208 @@ def create(value, timestamp_micros, windows, pane_info=PANE_INFO_UNKNOWN):
   return wv
 
 
+class BatchingMode(Enum):
+  CONCRETE = 1
+  HOMOGENEOUS = 2
+
+
+class WindowedBatch(object):
+  """A batch of N windowed values, each having a value, a timestamp and set of
+  windows."""
+  def with_values(self, new_values):
+    # type: (Any) -> WindowedBatch
+
+    """Creates a new WindowedBatch with the same timestamps and windows as this.
+
+    This is the fasted way to create a new WindowedValue.
+    """
+    raise NotImplementedError
+
+  def as_windowed_values(self, explode_fn: Callable) -> Iterable[WindowedValue]:
+    raise NotImplementedError
+
+  @staticmethod
+  def from_windowed_values(
+      windowed_values: Sequence[WindowedValue],
+      *,
+      produce_fn: Callable,
+      mode: BatchingMode = BatchingMode.CONCRETE) -> Iterable['WindowedBatch']:
+    if mode == BatchingMode.HOMOGENEOUS:

Review Comment:
   Do we need an enum here, or should we just implement this on the subclasses, e.g. HomogeneousWindowedBatch.from_windowed_values to get a set of homogeneous windowed values? 



##########
sdks/python/apache_beam/utils/windowed_value.py:
##########
@@ -279,6 +293,208 @@ def create(value, timestamp_micros, windows, pane_info=PANE_INFO_UNKNOWN):
   return wv
 
 
+class BatchingMode(Enum):
+  CONCRETE = 1
+  HOMOGENEOUS = 2
+
+
+class WindowedBatch(object):
+  """A batch of N windowed values, each having a value, a timestamp and set of
+  windows."""
+  def with_values(self, new_values):
+    # type: (Any) -> WindowedBatch
+
+    """Creates a new WindowedBatch with the same timestamps and windows as this.
+
+    This is the fasted way to create a new WindowedValue.
+    """
+    raise NotImplementedError
+
+  def as_windowed_values(self, explode_fn: Callable) -> Iterable[WindowedValue]:
+    raise NotImplementedError
+
+  @staticmethod
+  def from_windowed_values(
+      windowed_values: Sequence[WindowedValue],
+      *,
+      produce_fn: Callable,
+      mode: BatchingMode = BatchingMode.CONCRETE) -> Iterable['WindowedBatch']:
+    if mode == BatchingMode.HOMOGENEOUS:

Review Comment:
   Maybe we could want to hide the concrete subclasses, but it seems like this always gets overridden. Perhaps we don't need the concrete implementation until we solve the questions of batching + windowing in the API.



##########
sdks/python/apache_beam/utils/windowed_value.py:
##########
@@ -279,6 +293,208 @@ def create(value, timestamp_micros, windows, pane_info=PANE_INFO_UNKNOWN):
   return wv
 
 
+class BatchingMode(Enum):
+  CONCRETE = 1
+  HOMOGENEOUS = 2
+
+
+class WindowedBatch(object):
+  """A batch of N windowed values, each having a value, a timestamp and set of
+  windows."""
+  def with_values(self, new_values):
+    # type: (Any) -> WindowedBatch
+
+    """Creates a new WindowedBatch with the same timestamps and windows as this.
+
+    This is the fasted way to create a new WindowedValue.
+    """
+    raise NotImplementedError
+
+  def as_windowed_values(self, explode_fn: Callable) -> Iterable[WindowedValue]:
+    raise NotImplementedError
+
+  @staticmethod
+  def from_windowed_values(
+      windowed_values: Sequence[WindowedValue],
+      *,
+      produce_fn: Callable,
+      mode: BatchingMode = BatchingMode.CONCRETE) -> Iterable['WindowedBatch']:
+    if mode == BatchingMode.HOMOGENEOUS:
+      import collections
+      grouped = collections.defaultdict(lambda: [])
+      for wv in windowed_values:
+        grouped[(wv.timestamp, tuple(wv.windows),
+                 wv.pane_info)].append(wv.value)
+
+      for key, values in grouped.items():
+        timestamp, windows, pane_info = key
+        yield HomogeneousWindowedBatch.of(
+            produce_fn(values), timestamp, windows, pane_info)
+    elif mode == BatchingMode.CONCRETE:
+      yield ConcreteWindowedBatch(
+          produce_fn([wv.value for wv in windowed_values]),
+          [wv.timestamp
+           for wv in windowed_values], [wv.windows for wv in windowed_values],
+          [wv.pane_info for wv in windowed_values])
+    else:
+      raise AssertionError(
+          "Unrecognized BatchingMode in "
+          f"WindowedBatch.from_windowed_values: {mode!r}")
+
+
+class HomogeneousWindowedBatch(WindowedBatch):
+  """A WindowedBatch with Homogeneous event-time information, represented
+  internally as a WindowedValue.
+  """
+  def __init__(self, wv):
+    self._wv = wv
+
+  @staticmethod
+  def of(values, timestamp, windows, pane_info=PANE_INFO_UNKNOWN):
+    return HomogeneousWindowedBatch(
+        WindowedValue(values, timestamp, windows, pane_info))
+
+  @property
+  def values(self):
+    return self._wv.value
+
+  @property
+  def timestamp(self):
+    return self._wv.timestamp
+
+  @property
+  def pane_info(self):
+    return self._wv.pane_info
+
+  @property
+  def windows(self):
+    return self._wv.windows
+
+  @windows.setter
+  def windows(self, value):
+    self._wv.windows = value
+
+  def with_values(self, new_values):
+    # type: (Any) -> WindowedBatch
+    return HomogeneousWindowedBatch(self._wv.with_value(new_values))
+
+  def as_windowed_values(self, explode_fn: Callable) -> Iterable[WindowedValue]:
+    for value in explode_fn(self._wv.value):
+      yield self._wv.with_value(value)
+
+  def __eq__(self, other):
+    if isinstance(other, HomogeneousWindowedBatch):
+      return self._wv == other._wv
+    return NotImplemented
+
+  def __hash__(self):
+    return hash(self._wv)
+
+
+class ConcreteWindowedBatch(WindowedBatch):
+  """A concrete WindowedBatch where all event-time information is stored
+  independently for each element.
+
+  Attributes:
+    values: The underlying values of the windowed batch.
+    timestamp: An iterable of timestamps associated with the value as seconds
+      since Unix epoch.
+    windows: An iterable with a set (iterable) of window objects for each value.
+      The window objects are descendants of the BoundedWindow class.
+    pane_info: An iterable of PaneInfo descriptors describing the triggering
+      information for the pane that contained each value. Alternatively, a
+      single PaneInfo may be specified to use for every value. If None, will be
+      set to PANE_INFO_UNKNOWN.
+  """
+  def __init__(
+      self,
+      values,
+      timestamps,  # type: Sequence[TimestampTypes]
+      windows,  # type: Iterable[Tuple[BoundedWindow, ...]]
+      pane_infos=PANE_INFO_UNKNOWN  # type: Union[Iterable[PaneInfo],PaneInfo]
+  ):
+    self.values = values
+
+    def convert_timestamp(timestamp: TimestampTypes) -> int:
+      if isinstance(timestamp, int):
+        return timestamp * 1000000
+      else:
+        # TODO: Cache Timestamp object as in WindowedValue?
+        timestamp_object = (
+            timestamp
+            if isinstance(timestamp, Timestamp) else Timestamp.of(timestamp))
+        return timestamp_object.micros
+
+    self.timestamp_objects: Optional[List[Timestamp]] = None
+    self.timestamps_micros = [convert_timestamp(t) for t in timestamps]
+    self.windows = windows
+    #TODO: Should we store length?
+    #self.length = length
+    self.pane_infos = pane_infos
+
+  @property
+  def timestamps(self) -> Sequence[Timestamp]:
+    if self.timestamp_objects is None:
+      self.timestamp_objects = [
+          Timestamp(0, micros) for micros in self.timestamps_micros
+      ]
+
+    return self.timestamp_objects
+
+  def with_values(self, new_values):

Review Comment:
   Actually, does this ever get used? 



##########
sdks/python/apache_beam/utils/windowed_value.py:
##########
@@ -279,6 +293,208 @@ def create(value, timestamp_micros, windows, pane_info=PANE_INFO_UNKNOWN):
   return wv
 
 
+class BatchingMode(Enum):
+  CONCRETE = 1
+  HOMOGENEOUS = 2
+
+
+class WindowedBatch(object):
+  """A batch of N windowed values, each having a value, a timestamp and set of
+  windows."""
+  def with_values(self, new_values):
+    # type: (Any) -> WindowedBatch
+
+    """Creates a new WindowedBatch with the same timestamps and windows as this.
+
+    This is the fasted way to create a new WindowedValue.
+    """
+    raise NotImplementedError
+
+  def as_windowed_values(self, explode_fn: Callable) -> Iterable[WindowedValue]:
+    raise NotImplementedError
+
+  @staticmethod
+  def from_windowed_values(
+      windowed_values: Sequence[WindowedValue],
+      *,
+      produce_fn: Callable,
+      mode: BatchingMode = BatchingMode.CONCRETE) -> Iterable['WindowedBatch']:
+    if mode == BatchingMode.HOMOGENEOUS:
+      import collections
+      grouped = collections.defaultdict(lambda: [])
+      for wv in windowed_values:
+        grouped[(wv.timestamp, tuple(wv.windows),
+                 wv.pane_info)].append(wv.value)
+
+      for key, values in grouped.items():
+        timestamp, windows, pane_info = key
+        yield HomogeneousWindowedBatch.of(
+            produce_fn(values), timestamp, windows, pane_info)
+    elif mode == BatchingMode.CONCRETE:
+      yield ConcreteWindowedBatch(
+          produce_fn([wv.value for wv in windowed_values]),
+          [wv.timestamp
+           for wv in windowed_values], [wv.windows for wv in windowed_values],
+          [wv.pane_info for wv in windowed_values])
+    else:
+      raise AssertionError(
+          "Unrecognized BatchingMode in "
+          f"WindowedBatch.from_windowed_values: {mode!r}")
+
+
+class HomogeneousWindowedBatch(WindowedBatch):
+  """A WindowedBatch with Homogeneous event-time information, represented
+  internally as a WindowedValue.
+  """
+  def __init__(self, wv):
+    self._wv = wv
+
+  @staticmethod
+  def of(values, timestamp, windows, pane_info=PANE_INFO_UNKNOWN):
+    return HomogeneousWindowedBatch(
+        WindowedValue(values, timestamp, windows, pane_info))
+
+  @property
+  def values(self):
+    return self._wv.value
+
+  @property
+  def timestamp(self):
+    return self._wv.timestamp
+
+  @property
+  def pane_info(self):
+    return self._wv.pane_info
+
+  @property
+  def windows(self):
+    return self._wv.windows
+
+  @windows.setter
+  def windows(self, value):
+    self._wv.windows = value
+
+  def with_values(self, new_values):
+    # type: (Any) -> WindowedBatch
+    return HomogeneousWindowedBatch(self._wv.with_value(new_values))
+
+  def as_windowed_values(self, explode_fn: Callable) -> Iterable[WindowedValue]:
+    for value in explode_fn(self._wv.value):
+      yield self._wv.with_value(value)
+
+  def __eq__(self, other):
+    if isinstance(other, HomogeneousWindowedBatch):
+      return self._wv == other._wv
+    return NotImplemented
+
+  def __hash__(self):
+    return hash(self._wv)
+
+
+class ConcreteWindowedBatch(WindowedBatch):
+  """A concrete WindowedBatch where all event-time information is stored
+  independently for each element.
+
+  Attributes:
+    values: The underlying values of the windowed batch.
+    timestamp: An iterable of timestamps associated with the value as seconds
+      since Unix epoch.
+    windows: An iterable with a set (iterable) of window objects for each value.
+      The window objects are descendants of the BoundedWindow class.
+    pane_info: An iterable of PaneInfo descriptors describing the triggering
+      information for the pane that contained each value. Alternatively, a
+      single PaneInfo may be specified to use for every value. If None, will be
+      set to PANE_INFO_UNKNOWN.
+  """
+  def __init__(
+      self,
+      values,
+      timestamps,  # type: Sequence[TimestampTypes]
+      windows,  # type: Iterable[Tuple[BoundedWindow, ...]]
+      pane_infos=PANE_INFO_UNKNOWN  # type: Union[Iterable[PaneInfo],PaneInfo]
+  ):
+    self.values = values
+
+    def convert_timestamp(timestamp: TimestampTypes) -> int:
+      if isinstance(timestamp, int):
+        return timestamp * 1000000
+      else:
+        # TODO: Cache Timestamp object as in WindowedValue?
+        timestamp_object = (
+            timestamp
+            if isinstance(timestamp, Timestamp) else Timestamp.of(timestamp))
+        return timestamp_object.micros
+
+    self.timestamp_objects: Optional[List[Timestamp]] = None
+    self.timestamps_micros = [convert_timestamp(t) for t in timestamps]
+    self.windows = windows
+    #TODO: Should we store length?
+    #self.length = length
+    self.pane_infos = pane_infos
+
+  @property
+  def timestamps(self) -> Sequence[Timestamp]:
+    if self.timestamp_objects is None:
+      self.timestamp_objects = [
+          Timestamp(0, micros) for micros in self.timestamps_micros
+      ]
+
+    return self.timestamp_objects
+
+  def with_values(self, new_values):
+    # type: (Any) -> WindowedBatch
+
+    """Creates a new WindowedBatch with the same timestamps and windows as this.
+
+    This is the fasted way to create a new WindowedValue.
+    """
+    return create_batch(
+        new_values, self.timestamps_micros, self.windows, self.pane_infos)
+
+  def as_windowed_values(self, explode_fn: Callable) -> Iterable[WindowedValue]:
+    for value, timestamp, windows, pane_info in zip(explode_fn(self.values),
+                                                    self.timestamps_micros,
+                                                    self.windows,
+                                                    self._pane_infos_iter()):
+      yield create(value, timestamp, windows, pane_info)
+
+  def _pane_infos_iter(self):
+    if isinstance(self.pane_infos, PaneInfo):
+      return itertools.repeat(self.pane_infos, len(self.timestamps_micros))
+    else:
+      return self.pane_infos
+
+  def __eq__(self, other):
+    if isinstance(other, ConcreteWindowedBatch):
+      return (
+          type(self) == type(other) and
+          self.timestamps_micros == other.timestamps_micros and
+          self.values == other.values and self.windows == other.windows and
+          self.pane_infos == other.pane_infos)
+    return NotImplemented
+
+  def __hash__(self):
+    if isinstance(self.pane_infos, PaneInfo):
+      pane_infos_hash = hash(self.pane_infos)
+    else:
+      pane_infos_hash = sum(hash(p) for p in self.pane_infos)
+
+    return ((hash(self.values) & 0xFFFFFFFFFFFFFFF) + 3 *
+            (sum(self.timestamps_micros) & 0xFFFFFFFFFFFFFF) + 7 *
+            (sum(hash(w) for w in self.windows) & 0xFFFFFFFFFFFFF) + 11 *
+            (pane_infos_hash & 0xFFFFFFFFFFFFF))
+
+
+def create_batch(

Review Comment:
   This indirection was for performance reasons; just inline it above. 



##########
sdks/python/apache_beam/utils/windowed_value.py:
##########
@@ -279,6 +293,208 @@ def create(value, timestamp_micros, windows, pane_info=PANE_INFO_UNKNOWN):
   return wv
 
 
+class BatchingMode(Enum):
+  CONCRETE = 1
+  HOMOGENEOUS = 2
+
+
+class WindowedBatch(object):
+  """A batch of N windowed values, each having a value, a timestamp and set of
+  windows."""
+  def with_values(self, new_values):
+    # type: (Any) -> WindowedBatch
+
+    """Creates a new WindowedBatch with the same timestamps and windows as this.
+
+    This is the fasted way to create a new WindowedValue.
+    """
+    raise NotImplementedError
+
+  def as_windowed_values(self, explode_fn: Callable) -> Iterable[WindowedValue]:
+    raise NotImplementedError
+
+  @staticmethod
+  def from_windowed_values(
+      windowed_values: Sequence[WindowedValue],
+      *,
+      produce_fn: Callable,
+      mode: BatchingMode = BatchingMode.CONCRETE) -> Iterable['WindowedBatch']:
+    if mode == BatchingMode.HOMOGENEOUS:
+      import collections
+      grouped = collections.defaultdict(lambda: [])
+      for wv in windowed_values:
+        grouped[(wv.timestamp, tuple(wv.windows),
+                 wv.pane_info)].append(wv.value)
+
+      for key, values in grouped.items():
+        timestamp, windows, pane_info = key
+        yield HomogeneousWindowedBatch.of(
+            produce_fn(values), timestamp, windows, pane_info)
+    elif mode == BatchingMode.CONCRETE:
+      yield ConcreteWindowedBatch(
+          produce_fn([wv.value for wv in windowed_values]),
+          [wv.timestamp
+           for wv in windowed_values], [wv.windows for wv in windowed_values],
+          [wv.pane_info for wv in windowed_values])
+    else:
+      raise AssertionError(
+          "Unrecognized BatchingMode in "
+          f"WindowedBatch.from_windowed_values: {mode!r}")
+
+
+class HomogeneousWindowedBatch(WindowedBatch):
+  """A WindowedBatch with Homogeneous event-time information, represented
+  internally as a WindowedValue.
+  """
+  def __init__(self, wv):
+    self._wv = wv
+
+  @staticmethod
+  def of(values, timestamp, windows, pane_info=PANE_INFO_UNKNOWN):
+    return HomogeneousWindowedBatch(
+        WindowedValue(values, timestamp, windows, pane_info))
+
+  @property
+  def values(self):
+    return self._wv.value
+
+  @property
+  def timestamp(self):
+    return self._wv.timestamp
+
+  @property
+  def pane_info(self):
+    return self._wv.pane_info
+
+  @property
+  def windows(self):
+    return self._wv.windows
+
+  @windows.setter
+  def windows(self, value):
+    self._wv.windows = value
+
+  def with_values(self, new_values):
+    # type: (Any) -> WindowedBatch
+    return HomogeneousWindowedBatch(self._wv.with_value(new_values))
+
+  def as_windowed_values(self, explode_fn: Callable) -> Iterable[WindowedValue]:
+    for value in explode_fn(self._wv.value):
+      yield self._wv.with_value(value)
+
+  def __eq__(self, other):
+    if isinstance(other, HomogeneousWindowedBatch):
+      return self._wv == other._wv
+    return NotImplemented
+
+  def __hash__(self):
+    return hash(self._wv)
+
+
+class ConcreteWindowedBatch(WindowedBatch):
+  """A concrete WindowedBatch where all event-time information is stored
+  independently for each element.
+
+  Attributes:
+    values: The underlying values of the windowed batch.
+    timestamp: An iterable of timestamps associated with the value as seconds
+      since Unix epoch.
+    windows: An iterable with a set (iterable) of window objects for each value.
+      The window objects are descendants of the BoundedWindow class.
+    pane_info: An iterable of PaneInfo descriptors describing the triggering
+      information for the pane that contained each value. Alternatively, a
+      single PaneInfo may be specified to use for every value. If None, will be
+      set to PANE_INFO_UNKNOWN.
+  """
+  def __init__(
+      self,
+      values,
+      timestamps,  # type: Sequence[TimestampTypes]
+      windows,  # type: Iterable[Tuple[BoundedWindow, ...]]
+      pane_infos=PANE_INFO_UNKNOWN  # type: Union[Iterable[PaneInfo],PaneInfo]

Review Comment:
   Let's avoid making this optional, for WindowedValue that was only for backwards compatibility. (Per the suggestion above, perhaps this signature should be different anyway, and we'd avoid things like convert_timestamp...)



##########
sdks/python/apache_beam/utils/windowed_value.py:
##########
@@ -279,6 +293,208 @@ def create(value, timestamp_micros, windows, pane_info=PANE_INFO_UNKNOWN):
   return wv
 
 
+class BatchingMode(Enum):
+  CONCRETE = 1
+  HOMOGENEOUS = 2
+
+
+class WindowedBatch(object):
+  """A batch of N windowed values, each having a value, a timestamp and set of
+  windows."""
+  def with_values(self, new_values):
+    # type: (Any) -> WindowedBatch
+
+    """Creates a new WindowedBatch with the same timestamps and windows as this.
+
+    This is the fasted way to create a new WindowedValue.
+    """
+    raise NotImplementedError
+
+  def as_windowed_values(self, explode_fn: Callable) -> Iterable[WindowedValue]:
+    raise NotImplementedError
+
+  @staticmethod
+  def from_windowed_values(
+      windowed_values: Sequence[WindowedValue],
+      *,
+      produce_fn: Callable,
+      mode: BatchingMode = BatchingMode.CONCRETE) -> Iterable['WindowedBatch']:
+    if mode == BatchingMode.HOMOGENEOUS:
+      import collections
+      grouped = collections.defaultdict(lambda: [])
+      for wv in windowed_values:
+        grouped[(wv.timestamp, tuple(wv.windows),
+                 wv.pane_info)].append(wv.value)
+
+      for key, values in grouped.items():
+        timestamp, windows, pane_info = key
+        yield HomogeneousWindowedBatch.of(
+            produce_fn(values), timestamp, windows, pane_info)
+    elif mode == BatchingMode.CONCRETE:
+      yield ConcreteWindowedBatch(
+          produce_fn([wv.value for wv in windowed_values]),
+          [wv.timestamp
+           for wv in windowed_values], [wv.windows for wv in windowed_values],
+          [wv.pane_info for wv in windowed_values])
+    else:
+      raise AssertionError(
+          "Unrecognized BatchingMode in "
+          f"WindowedBatch.from_windowed_values: {mode!r}")
+
+
+class HomogeneousWindowedBatch(WindowedBatch):
+  """A WindowedBatch with Homogeneous event-time information, represented
+  internally as a WindowedValue.
+  """
+  def __init__(self, wv):
+    self._wv = wv
+
+  @staticmethod
+  def of(values, timestamp, windows, pane_info=PANE_INFO_UNKNOWN):
+    return HomogeneousWindowedBatch(
+        WindowedValue(values, timestamp, windows, pane_info))
+
+  @property
+  def values(self):
+    return self._wv.value
+
+  @property
+  def timestamp(self):
+    return self._wv.timestamp
+
+  @property
+  def pane_info(self):
+    return self._wv.pane_info
+
+  @property
+  def windows(self):
+    return self._wv.windows
+
+  @windows.setter
+  def windows(self, value):
+    self._wv.windows = value
+
+  def with_values(self, new_values):
+    # type: (Any) -> WindowedBatch
+    return HomogeneousWindowedBatch(self._wv.with_value(new_values))
+
+  def as_windowed_values(self, explode_fn: Callable) -> Iterable[WindowedValue]:
+    for value in explode_fn(self._wv.value):
+      yield self._wv.with_value(value)
+
+  def __eq__(self, other):
+    if isinstance(other, HomogeneousWindowedBatch):
+      return self._wv == other._wv
+    return NotImplemented
+
+  def __hash__(self):
+    return hash(self._wv)
+
+
+class ConcreteWindowedBatch(WindowedBatch):
+  """A concrete WindowedBatch where all event-time information is stored
+  independently for each element.
+
+  Attributes:
+    values: The underlying values of the windowed batch.
+    timestamp: An iterable of timestamps associated with the value as seconds
+      since Unix epoch.
+    windows: An iterable with a set (iterable) of window objects for each value.
+      The window objects are descendants of the BoundedWindow class.
+    pane_info: An iterable of PaneInfo descriptors describing the triggering
+      information for the pane that contained each value. Alternatively, a
+      single PaneInfo may be specified to use for every value. If None, will be
+      set to PANE_INFO_UNKNOWN.
+  """
+  def __init__(
+      self,
+      values,
+      timestamps,  # type: Sequence[TimestampTypes]
+      windows,  # type: Iterable[Tuple[BoundedWindow, ...]]
+      pane_infos=PANE_INFO_UNKNOWN  # type: Union[Iterable[PaneInfo],PaneInfo]
+  ):
+    self.values = values
+
+    def convert_timestamp(timestamp: TimestampTypes) -> int:
+      if isinstance(timestamp, int):
+        return timestamp * 1000000
+      else:
+        # TODO: Cache Timestamp object as in WindowedValue?
+        timestamp_object = (
+            timestamp
+            if isinstance(timestamp, Timestamp) else Timestamp.of(timestamp))
+        return timestamp_object.micros
+
+    self.timestamp_objects: Optional[List[Timestamp]] = None
+    self.timestamps_micros = [convert_timestamp(t) for t in timestamps]
+    self.windows = windows
+    #TODO: Should we store length?
+    #self.length = length
+    self.pane_infos = pane_infos
+
+  @property
+  def timestamps(self) -> Sequence[Timestamp]:
+    if self.timestamp_objects is None:
+      self.timestamp_objects = [
+          Timestamp(0, micros) for micros in self.timestamps_micros
+      ]
+
+    return self.timestamp_objects
+
+  def with_values(self, new_values):

Review Comment:
   Presumably there's some constraints on the size and ordering of new_values? 



##########
sdks/python/apache_beam/utils/windowed_value.py:
##########
@@ -279,6 +293,208 @@ def create(value, timestamp_micros, windows, pane_info=PANE_INFO_UNKNOWN):
   return wv
 
 
+class BatchingMode(Enum):
+  CONCRETE = 1
+  HOMOGENEOUS = 2
+
+
+class WindowedBatch(object):
+  """A batch of N windowed values, each having a value, a timestamp and set of
+  windows."""
+  def with_values(self, new_values):
+    # type: (Any) -> WindowedBatch
+
+    """Creates a new WindowedBatch with the same timestamps and windows as this.
+
+    This is the fasted way to create a new WindowedValue.
+    """
+    raise NotImplementedError
+
+  def as_windowed_values(self, explode_fn: Callable) -> Iterable[WindowedValue]:
+    raise NotImplementedError
+
+  @staticmethod
+  def from_windowed_values(
+      windowed_values: Sequence[WindowedValue],
+      *,
+      produce_fn: Callable,
+      mode: BatchingMode = BatchingMode.CONCRETE) -> Iterable['WindowedBatch']:
+    if mode == BatchingMode.HOMOGENEOUS:
+      import collections
+      grouped = collections.defaultdict(lambda: [])
+      for wv in windowed_values:
+        grouped[(wv.timestamp, tuple(wv.windows),
+                 wv.pane_info)].append(wv.value)
+
+      for key, values in grouped.items():
+        timestamp, windows, pane_info = key
+        yield HomogeneousWindowedBatch.of(
+            produce_fn(values), timestamp, windows, pane_info)
+    elif mode == BatchingMode.CONCRETE:
+      yield ConcreteWindowedBatch(
+          produce_fn([wv.value for wv in windowed_values]),
+          [wv.timestamp
+           for wv in windowed_values], [wv.windows for wv in windowed_values],
+          [wv.pane_info for wv in windowed_values])
+    else:
+      raise AssertionError(
+          "Unrecognized BatchingMode in "
+          f"WindowedBatch.from_windowed_values: {mode!r}")
+
+
+class HomogeneousWindowedBatch(WindowedBatch):
+  """A WindowedBatch with Homogeneous event-time information, represented
+  internally as a WindowedValue.
+  """
+  def __init__(self, wv):
+    self._wv = wv
+
+  @staticmethod
+  def of(values, timestamp, windows, pane_info=PANE_INFO_UNKNOWN):
+    return HomogeneousWindowedBatch(
+        WindowedValue(values, timestamp, windows, pane_info))
+
+  @property
+  def values(self):
+    return self._wv.value
+
+  @property
+  def timestamp(self):
+    return self._wv.timestamp
+
+  @property
+  def pane_info(self):
+    return self._wv.pane_info
+
+  @property
+  def windows(self):
+    return self._wv.windows
+
+  @windows.setter

Review Comment:
   Where is this used? I'd prefer this be immutable. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] robertwb commented on a diff in pull request #17384: [BEAM-14294] Worker changes to support trivial Batched DoFns

Posted by GitBox <gi...@apache.org>.
robertwb commented on code in PR #17384:
URL: https://github.com/apache/beam/pull/17384#discussion_r863270883


##########
sdks/python/apache_beam/runners/worker/operations.py:
##########
@@ -223,6 +235,120 @@ def current_element_progress(self):
     return self.consumer.current_element_progress()
 
 
+class GeneralPurposeConsumerSet(ConsumerSet):
+  """ConsumerSet implementation that handles all combinations of possible edges.
+  """
+  def __init__(self,
+               counter_factory,
+               step_name,  # type: str
+               output_index,
+               coder,
+               producer_type_hints,
+               consumers,  # type: List[Operation]
+               producer_batch_converter):
+    super().__init__(
+        counter_factory,
+        step_name,
+        output_index,
+        consumers,
+        coder,
+        producer_type_hints)
+
+    self.producer_batch_converter = producer_batch_converter
+
+    # Partition consumers into three groups:
+    # - consumers that will be passed elements
+    # - consumers that will be passed batches (where their input batch type
+    #   matches the output of the producer)
+    # - consumers that will be passed converted batches
+    self.element_consumers: List[Operation] = []
+    self.passthrough_batch_consumers: List[Operation] = []
+    other_batch_consumers: DefaultDict[
+        BatchConverter, List[Operation]] = collections.defaultdict(lambda: [])
+
+    for consumer in consumers:
+      if not consumer.get_batching_preference().supports_batches:
+        self.element_consumers.append(consumer)
+      elif (consumer.get_input_batch_converter() ==
+            self.producer_batch_converter):
+        self.passthrough_batch_consumers.append(consumer)
+      else:
+        # Batch consumer with a mismatched batch type
+        if consumer.get_batching_preference().supports_elements:
+          # Pass it elements if we can
+          self.element_consumers.append(consumer)
+        else:
+          # As a last resort, explode and rebatch
+          consumer_batch_converter = consumer.get_input_batch_converter()
+          # This consumer supports batches, it must have a batch converter
+          assert consumer_batch_converter is not None
+          other_batch_consumers[consumer_batch_converter].append(consumer)
+
+    self.other_batch_consumers: Dict[BatchConverter, List[Operation]] = dict(
+        other_batch_consumers)
+
+    self.has_batch_consumers = (
+        self.passthrough_batch_consumers or self.other_batch_consumers)
+    self._batched_elements: List[Any] = []
+
+  def receive(self, windowed_value):
+    # type: (WindowedValue) -> None
+    self.update_counters_start(windowed_value)
+
+    for consumer in self.element_consumers:
+      cython.cast(Operation, consumer).process(windowed_value)
+
+    # TODO: Do this branching when contstructing ConsumerSet
+    if self.has_batch_consumers:
+      self._batched_elements.append(windowed_value)
+
+    self.update_counters_finish()
+
+  def receive_batch(self, windowed_batch):
+    #self.update_counters_start(windowed_value)
+    if self.element_consumers:
+      for wv in windowed_batch.as_windowed_values(
+          self.producer_batch_converter.explode_batch):
+        for consumer in self.element_consumers:
+          cython.cast(Operation, consumer).process(wv)
+
+    for consumer in self.passthrough_batch_consumers:
+      cython.cast(Operation, consumer).process_batch(windowed_batch)
+
+    for (consumer_batch_converter,
+         consumers) in self.other_batch_consumers.items():
+      # Explode and rebatch into the new batch type (ouch!)

Review Comment:
   No, it will be logged once per instance (which could be a lot, hundreds of times in streaming). 
   
   Ideally we could just name the two operations involved. I wouldn't log(error), just log(warning). It could be actionable if someone expects the batch converters to line up and they don't. 



##########
sdks/python/apache_beam/utils/windowed_value.py:
##########
@@ -279,6 +293,208 @@ def create(value, timestamp_micros, windows, pane_info=PANE_INFO_UNKNOWN):
   return wv
 
 
+class BatchingMode(Enum):
+  CONCRETE = 1
+  HOMOGENEOUS = 2
+
+
+class WindowedBatch(object):
+  """A batch of N windowed values, each having a value, a timestamp and set of
+  windows."""
+  def with_values(self, new_values):
+    # type: (Any) -> WindowedBatch
+
+    """Creates a new WindowedBatch with the same timestamps and windows as this.
+
+    This is the fasted way to create a new WindowedValue.
+    """
+    raise NotImplementedError
+
+  def as_windowed_values(self, explode_fn: Callable) -> Iterable[WindowedValue]:
+    raise NotImplementedError
+
+  @staticmethod
+  def from_windowed_values(
+      windowed_values: Sequence[WindowedValue],
+      *,
+      produce_fn: Callable,
+      mode: BatchingMode = BatchingMode.CONCRETE) -> Iterable['WindowedBatch']:
+    if mode == BatchingMode.HOMOGENEOUS:

Review Comment:
   +1



##########
sdks/python/apache_beam/runners/common.py:
##########
@@ -1361,10 +1580,80 @@ def process_outputs(
         self.main_receivers.receive(windowed_value)
       else:
         self.tagged_receivers[tag].receive(windowed_value)
+
+    # TODO(BEAM-3937): Remove if block after output counter released.
+    # Only enable per_element_output_counter when counter cythonized
+    if self.per_element_output_counter is not None:
+      self.per_element_output_counter.add_input(output_element_count)
+
+  def process_batch_outputs(
+      self, windowed_input_batch, results, watermark_estimator=None):
+    # type: (WindowedValue, Iterable[Any], Optional[WatermarkEstimator]) -> None
+
+    """Dispatch the result of process computation to the appropriate receivers.
+
+    A value wrapped in a TaggedOutput object will be unwrapped and
+    then dispatched to the appropriate indexed output.
+    """
+    if results is None:
+      # TODO(BEAM-3937): Remove if block after output counter released.
+      # Only enable per_element_output_counter when counter cythonized.
+      if self.per_element_output_counter is not None:
+        self.per_element_output_counter.add_input(0)
+      return
+
+    # TODO(BEAM-10782): Verify that the results object is a valid iterable type
+    #  if performance_runtime_type_check is active, without harming performance
+
+    output_element_count = 0
+    for result in results:
+      # results here may be a generator, which cannot call len on it.
+      output_element_count += 1
+      tag = None
+      if isinstance(result, TaggedOutput):
+        tag = result.tag
+        if not isinstance(tag, str):
+          raise TypeError('In %s, tag %s is not a string' % (self, tag))
+        result = result.value
+      if isinstance(result, (WindowedValue, TimestampedValue)):
+        raise TypeError(
+            f"Received {type(result).__name__} from DoFn that was "
+            "expected to produce a batch.")
+      if isinstance(result, WindowedBatch):
+        if isinstance(result, ConcreteWindowedBatch):
+          # TODO: Rebatch into homogenous batches (or remove
+          # ConcreteWindowedBatch)
+          raise NotImplementedError
+        elif isinstance(result, HomogeneousWindowedBatch):
+          windowed_batch = result
+        else:
+          raise AssertionError(
+              "Unrecognized WindowedBatch implementation: "
+              f"{type(windowed_batch)}")
+
+        if (windowed_input_batch is not None and
+            len(windowed_input_batch.windows) != 1):
+          windowed_batch.windows *= len(windowed_input_batch.windows)
+      # TODO(BEAM-14292): Add TimestampedBatch, an analogue for TimestampedValue

Review Comment:
   Maybe. I'd actually rather deprecate emitting TimestampedValues and have a separate AssignTimestamps operation. 



##########
sdks/python/apache_beam/runners/common.py:
##########
@@ -1200,6 +1404,13 @@ def process(self, windowed_value):
       self._reraise_augmented(exn)
       return []
 
+  def process_batch(self, windowed_batch):

Review Comment:
   Here (or elsewhere) it seems we should document that this will only be called with the kind of batch that this DoFn supports, right?



##########
sdks/python/apache_beam/utils/windowed_value.py:
##########
@@ -279,6 +293,208 @@ def create(value, timestamp_micros, windows, pane_info=PANE_INFO_UNKNOWN):
   return wv
 
 
+class BatchingMode(Enum):
+  CONCRETE = 1
+  HOMOGENEOUS = 2
+
+
+class WindowedBatch(object):
+  """A batch of N windowed values, each having a value, a timestamp and set of
+  windows."""
+  def with_values(self, new_values):
+    # type: (Any) -> WindowedBatch
+
+    """Creates a new WindowedBatch with the same timestamps and windows as this.
+
+    This is the fasted way to create a new WindowedValue.
+    """
+    raise NotImplementedError
+
+  def as_windowed_values(self, explode_fn: Callable) -> Iterable[WindowedValue]:
+    raise NotImplementedError
+
+  @staticmethod
+  def from_windowed_values(
+      windowed_values: Sequence[WindowedValue],
+      *,
+      produce_fn: Callable,
+      mode: BatchingMode = BatchingMode.CONCRETE) -> Iterable['WindowedBatch']:
+    if mode == BatchingMode.HOMOGENEOUS:
+      import collections
+      grouped = collections.defaultdict(lambda: [])
+      for wv in windowed_values:
+        grouped[(wv.timestamp, tuple(wv.windows),
+                 wv.pane_info)].append(wv.value)
+
+      for key, values in grouped.items():
+        timestamp, windows, pane_info = key
+        yield HomogeneousWindowedBatch.of(
+            produce_fn(values), timestamp, windows, pane_info)
+    elif mode == BatchingMode.CONCRETE:
+      yield ConcreteWindowedBatch(
+          produce_fn([wv.value for wv in windowed_values]),
+          [wv.timestamp
+           for wv in windowed_values], [wv.windows for wv in windowed_values],
+          [wv.pane_info for wv in windowed_values])
+    else:
+      raise AssertionError(
+          "Unrecognized BatchingMode in "
+          f"WindowedBatch.from_windowed_values: {mode!r}")
+
+
+class HomogeneousWindowedBatch(WindowedBatch):
+  """A WindowedBatch with Homogeneous event-time information, represented
+  internally as a WindowedValue.
+  """
+  def __init__(self, wv):
+    self._wv = wv
+
+  @staticmethod
+  def of(values, timestamp, windows, pane_info=PANE_INFO_UNKNOWN):
+    return HomogeneousWindowedBatch(
+        WindowedValue(values, timestamp, windows, pane_info))
+
+  @property
+  def values(self):
+    return self._wv.value
+
+  @property
+  def timestamp(self):

Review Comment:
   Ah, yes. 



##########
sdks/python/apache_beam/runners/common.py:
##########
@@ -1361,10 +1580,80 @@ def process_outputs(
         self.main_receivers.receive(windowed_value)
       else:
         self.tagged_receivers[tag].receive(windowed_value)
+
+    # TODO(BEAM-3937): Remove if block after output counter released.
+    # Only enable per_element_output_counter when counter cythonized
+    if self.per_element_output_counter is not None:
+      self.per_element_output_counter.add_input(output_element_count)
+
+  def process_batch_outputs(
+      self, windowed_input_batch, results, watermark_estimator=None):
+    # type: (WindowedValue, Iterable[Any], Optional[WatermarkEstimator]) -> None
+
+    """Dispatch the result of process computation to the appropriate receivers.
+
+    A value wrapped in a TaggedOutput object will be unwrapped and
+    then dispatched to the appropriate indexed output.
+    """
+    if results is None:
+      # TODO(BEAM-3937): Remove if block after output counter released.
+      # Only enable per_element_output_counter when counter cythonized.
+      if self.per_element_output_counter is not None:
+        self.per_element_output_counter.add_input(0)
+      return
+
+    # TODO(BEAM-10782): Verify that the results object is a valid iterable type
+    #  if performance_runtime_type_check is active, without harming performance
+
+    output_element_count = 0
+    for result in results:
+      # results here may be a generator, which cannot call len on it.
+      output_element_count += 1
+      tag = None
+      if isinstance(result, TaggedOutput):
+        tag = result.tag
+        if not isinstance(tag, str):
+          raise TypeError('In %s, tag %s is not a string' % (self, tag))
+        result = result.value
+      if isinstance(result, (WindowedValue, TimestampedValue)):
+        raise TypeError(
+            f"Received {type(result).__name__} from DoFn that was "
+            "expected to produce a batch.")
+      if isinstance(result, WindowedBatch):
+        if isinstance(result, ConcreteWindowedBatch):
+          # TODO: Rebatch into homogenous batches (or remove
+          # ConcreteWindowedBatch)
+          raise NotImplementedError
+        elif isinstance(result, HomogeneousWindowedBatch):
+          windowed_batch = result
+        else:
+          raise AssertionError(
+              "Unrecognized WindowedBatch implementation: "
+              f"{type(windowed_batch)}")
+
+        if (windowed_input_batch is not None and
+            len(windowed_input_batch.windows) != 1):
+          windowed_batch.windows *= len(windowed_input_batch.windows)
+      # TODO(BEAM-14292): Add TimestampedBatch, an analogue for TimestampedValue
+      # and handle it here (see TimestampedValue logic in process_outputs).
+      else:
+        # TODO: This should error unless the DoFn was defined with
+        # @DoFn.yields_batches(output_aligned_with_input=True)
+        # We should consider also validating that the length is the same as
+        # windowed_input_batch
+        windowed_batch = windowed_input_batch.with_values(result)

Review Comment:
   This is safe if the input is homogeneous batch. Otherwise, let's throw an exception for now. 



##########
sdks/python/apache_beam/runners/common.py:
##########
@@ -1361,10 +1580,80 @@ def process_outputs(
         self.main_receivers.receive(windowed_value)
       else:
         self.tagged_receivers[tag].receive(windowed_value)
+
+    # TODO(BEAM-3937): Remove if block after output counter released.
+    # Only enable per_element_output_counter when counter cythonized
+    if self.per_element_output_counter is not None:
+      self.per_element_output_counter.add_input(output_element_count)
+
+  def process_batch_outputs(
+      self, windowed_input_batch, results, watermark_estimator=None):
+    # type: (WindowedValue, Iterable[Any], Optional[WatermarkEstimator]) -> None
+
+    """Dispatch the result of process computation to the appropriate receivers.
+
+    A value wrapped in a TaggedOutput object will be unwrapped and
+    then dispatched to the appropriate indexed output.
+    """
+    if results is None:
+      # TODO(BEAM-3937): Remove if block after output counter released.
+      # Only enable per_element_output_counter when counter cythonized.
+      if self.per_element_output_counter is not None:
+        self.per_element_output_counter.add_input(0)
+      return
+
+    # TODO(BEAM-10782): Verify that the results object is a valid iterable type
+    #  if performance_runtime_type_check is active, without harming performance
+
+    output_element_count = 0
+    for result in results:
+      # results here may be a generator, which cannot call len on it.
+      output_element_count += 1

Review Comment:
   Increment by the batch size. 



##########
sdks/python/apache_beam/runners/common.py:
##########
@@ -1361,10 +1580,80 @@ def process_outputs(
         self.main_receivers.receive(windowed_value)
       else:
         self.tagged_receivers[tag].receive(windowed_value)
+
+    # TODO(BEAM-3937): Remove if block after output counter released.
+    # Only enable per_element_output_counter when counter cythonized
+    if self.per_element_output_counter is not None:
+      self.per_element_output_counter.add_input(output_element_count)
+
+  def process_batch_outputs(
+      self, windowed_input_batch, results, watermark_estimator=None):
+    # type: (WindowedValue, Iterable[Any], Optional[WatermarkEstimator]) -> None
+
+    """Dispatch the result of process computation to the appropriate receivers.
+
+    A value wrapped in a TaggedOutput object will be unwrapped and
+    then dispatched to the appropriate indexed output.
+    """
+    if results is None:
+      # TODO(BEAM-3937): Remove if block after output counter released.
+      # Only enable per_element_output_counter when counter cythonized.
+      if self.per_element_output_counter is not None:
+        self.per_element_output_counter.add_input(0)
+      return
+
+    # TODO(BEAM-10782): Verify that the results object is a valid iterable type
+    #  if performance_runtime_type_check is active, without harming performance
+
+    output_element_count = 0
+    for result in results:
+      # results here may be a generator, which cannot call len on it.
+      output_element_count += 1
+      tag = None
+      if isinstance(result, TaggedOutput):
+        tag = result.tag
+        if not isinstance(tag, str):
+          raise TypeError('In %s, tag %s is not a string' % (self, tag))
+        result = result.value
+      if isinstance(result, (WindowedValue, TimestampedValue)):
+        raise TypeError(
+            f"Received {type(result).__name__} from DoFn that was "
+            "expected to produce a batch.")
+      if isinstance(result, WindowedBatch):
+        if isinstance(result, ConcreteWindowedBatch):
+          # TODO: Rebatch into homogenous batches (or remove
+          # ConcreteWindowedBatch)
+          raise NotImplementedError
+        elif isinstance(result, HomogeneousWindowedBatch):
+          windowed_batch = result
+        else:
+          raise AssertionError(
+              "Unrecognized WindowedBatch implementation: "
+              f"{type(windowed_batch)}")
+
+        if (windowed_input_batch is not None and
+            len(windowed_input_batch.windows) != 1):
+          windowed_batch.windows *= len(windowed_input_batch.windows)

Review Comment:
   Oh, here's where you use window assignment. I'm not sure this is safe....



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] TheNeuralBit commented on pull request #17384: [BEAM-14294] Worker changes to support trivial Batched DoFns

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on PR #17384:
URL: https://github.com/apache/beam/pull/17384#issuecomment-1115448906

   Thanks @robertwb I replied to all the questions I can off the top of my head. I'll take a pass at addressing the other comments  and let you know


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] TheNeuralBit commented on a diff in pull request #17384: [BEAM-14294] Worker changes to support trivial Batched DoFns

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on code in PR #17384:
URL: https://github.com/apache/beam/pull/17384#discussion_r863244420


##########
sdks/python/apache_beam/runners/worker/operations.py:
##########
@@ -223,6 +235,120 @@ def current_element_progress(self):
     return self.consumer.current_element_progress()
 
 
+class GeneralPurposeConsumerSet(ConsumerSet):
+  """ConsumerSet implementation that handles all combinations of possible edges.
+  """
+  def __init__(self,
+               counter_factory,
+               step_name,  # type: str
+               output_index,
+               coder,
+               producer_type_hints,
+               consumers,  # type: List[Operation]
+               producer_batch_converter):
+    super().__init__(
+        counter_factory,
+        step_name,
+        output_index,
+        consumers,
+        coder,
+        producer_type_hints)
+
+    self.producer_batch_converter = producer_batch_converter
+
+    # Partition consumers into three groups:
+    # - consumers that will be passed elements
+    # - consumers that will be passed batches (where their input batch type
+    #   matches the output of the producer)
+    # - consumers that will be passed converted batches
+    self.element_consumers: List[Operation] = []
+    self.passthrough_batch_consumers: List[Operation] = []
+    other_batch_consumers: DefaultDict[
+        BatchConverter, List[Operation]] = collections.defaultdict(lambda: [])
+
+    for consumer in consumers:
+      if not consumer.get_batching_preference().supports_batches:
+        self.element_consumers.append(consumer)
+      elif (consumer.get_input_batch_converter() ==
+            self.producer_batch_converter):
+        self.passthrough_batch_consumers.append(consumer)
+      else:
+        # Batch consumer with a mismatched batch type
+        if consumer.get_batching_preference().supports_elements:
+          # Pass it elements if we can
+          self.element_consumers.append(consumer)
+        else:
+          # As a last resort, explode and rebatch
+          consumer_batch_converter = consumer.get_input_batch_converter()
+          # This consumer supports batches, it must have a batch converter
+          assert consumer_batch_converter is not None
+          other_batch_consumers[consumer_batch_converter].append(consumer)
+
+    self.other_batch_consumers: Dict[BatchConverter, List[Operation]] = dict(
+        other_batch_consumers)
+
+    self.has_batch_consumers = (
+        self.passthrough_batch_consumers or self.other_batch_consumers)
+    self._batched_elements: List[Any] = []
+
+  def receive(self, windowed_value):
+    # type: (WindowedValue) -> None
+    self.update_counters_start(windowed_value)
+
+    for consumer in self.element_consumers:
+      cython.cast(Operation, consumer).process(windowed_value)
+
+    # TODO: Do this branching when contstructing ConsumerSet
+    if self.has_batch_consumers:
+      self._batched_elements.append(windowed_value)
+
+    self.update_counters_finish()
+
+  def receive_batch(self, windowed_batch):
+    #self.update_counters_start(windowed_value)
+    if self.element_consumers:
+      for wv in windowed_batch.as_windowed_values(
+          self.producer_batch_converter.explode_batch):
+        for consumer in self.element_consumers:
+          cython.cast(Operation, consumer).process(wv)
+
+    for consumer in self.passthrough_batch_consumers:
+      cython.cast(Operation, consumer).process_batch(windowed_batch)
+
+    for (consumer_batch_converter,
+         consumers) in self.other_batch_consumers.items():
+      # Explode and rebatch into the new batch type (ouch!)

Review Comment:
   If we log this when populating other_batch_consumers in `__init__` will it only be logged once (per worker)?
   
   I'm also a little worried that the error will be a little opaque to the user, and un-actionable. Any thoughts on how to make it more helpful? I suppose we can recognize the DoOperation to DoOperation case and make that nice at least.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] TheNeuralBit commented on a diff in pull request #17384: [BEAM-14294] Worker changes to support trivial Batched DoFns

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on code in PR #17384:
URL: https://github.com/apache/beam/pull/17384#discussion_r863245554


##########
sdks/python/apache_beam/utils/windowed_value.py:
##########
@@ -279,6 +293,208 @@ def create(value, timestamp_micros, windows, pane_info=PANE_INFO_UNKNOWN):
   return wv
 
 
+class BatchingMode(Enum):
+  CONCRETE = 1
+  HOMOGENEOUS = 2
+
+
+class WindowedBatch(object):
+  """A batch of N windowed values, each having a value, a timestamp and set of
+  windows."""
+  def with_values(self, new_values):
+    # type: (Any) -> WindowedBatch
+
+    """Creates a new WindowedBatch with the same timestamps and windows as this.
+
+    This is the fasted way to create a new WindowedValue.
+    """
+    raise NotImplementedError
+
+  def as_windowed_values(self, explode_fn: Callable) -> Iterable[WindowedValue]:
+    raise NotImplementedError
+
+  @staticmethod
+  def from_windowed_values(
+      windowed_values: Sequence[WindowedValue],
+      *,
+      produce_fn: Callable,
+      mode: BatchingMode = BatchingMode.CONCRETE) -> Iterable['WindowedBatch']:
+    if mode == BatchingMode.HOMOGENEOUS:

Review Comment:
   Yeah.. I think it make sense to just keep it simple and tear out Concrete(Heterogeneous)WindowedBatch for now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] TheNeuralBit commented on a diff in pull request #17384: [BEAM-14294] Worker changes to support trivial Batched DoFns

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on code in PR #17384:
URL: https://github.com/apache/beam/pull/17384#discussion_r864048437


##########
sdks/python/apache_beam/utils/windowed_value.py:
##########
@@ -279,6 +293,208 @@ def create(value, timestamp_micros, windows, pane_info=PANE_INFO_UNKNOWN):
   return wv
 
 
+class BatchingMode(Enum):
+  CONCRETE = 1
+  HOMOGENEOUS = 2
+
+
+class WindowedBatch(object):
+  """A batch of N windowed values, each having a value, a timestamp and set of
+  windows."""
+  def with_values(self, new_values):
+    # type: (Any) -> WindowedBatch
+
+    """Creates a new WindowedBatch with the same timestamps and windows as this.
+
+    This is the fasted way to create a new WindowedValue.
+    """
+    raise NotImplementedError
+
+  def as_windowed_values(self, explode_fn: Callable) -> Iterable[WindowedValue]:
+    raise NotImplementedError
+
+  @staticmethod
+  def from_windowed_values(
+      windowed_values: Sequence[WindowedValue],
+      *,
+      produce_fn: Callable,
+      mode: BatchingMode = BatchingMode.CONCRETE) -> Iterable['WindowedBatch']:
+    if mode == BatchingMode.HOMOGENEOUS:
+      import collections
+      grouped = collections.defaultdict(lambda: [])
+      for wv in windowed_values:
+        grouped[(wv.timestamp, tuple(wv.windows),
+                 wv.pane_info)].append(wv.value)
+
+      for key, values in grouped.items():
+        timestamp, windows, pane_info = key
+        yield HomogeneousWindowedBatch.of(
+            produce_fn(values), timestamp, windows, pane_info)
+    elif mode == BatchingMode.CONCRETE:
+      yield ConcreteWindowedBatch(
+          produce_fn([wv.value for wv in windowed_values]),
+          [wv.timestamp
+           for wv in windowed_values], [wv.windows for wv in windowed_values],
+          [wv.pane_info for wv in windowed_values])
+    else:
+      raise AssertionError(
+          "Unrecognized BatchingMode in "
+          f"WindowedBatch.from_windowed_values: {mode!r}")
+
+
+class HomogeneousWindowedBatch(WindowedBatch):
+  """A WindowedBatch with Homogeneous event-time information, represented
+  internally as a WindowedValue.
+  """
+  def __init__(self, wv):
+    self._wv = wv
+
+  @staticmethod
+  def of(values, timestamp, windows, pane_info=PANE_INFO_UNKNOWN):
+    return HomogeneousWindowedBatch(
+        WindowedValue(values, timestamp, windows, pane_info))
+
+  @property
+  def values(self):
+    return self._wv.value
+
+  @property
+  def timestamp(self):
+    return self._wv.timestamp
+
+  @property
+  def pane_info(self):
+    return self._wv.pane_info
+
+  @property
+  def windows(self):
+    return self._wv.windows
+
+  @windows.setter

Review Comment:
   I'll resolve this comment, let's keep discussing this in your comment where you discovered window assignment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] robertwb commented on a diff in pull request #17384: [BEAM-14294] Worker changes to support trivial Batched DoFns

Posted by GitBox <gi...@apache.org>.
robertwb commented on code in PR #17384:
URL: https://github.com/apache/beam/pull/17384#discussion_r864277074


##########
sdks/python/apache_beam/runners/worker/operations.py:
##########
@@ -223,6 +235,120 @@ def current_element_progress(self):
     return self.consumer.current_element_progress()
 
 
+class GeneralPurposeConsumerSet(ConsumerSet):
+  """ConsumerSet implementation that handles all combinations of possible edges.
+  """
+  def __init__(self,
+               counter_factory,
+               step_name,  # type: str
+               output_index,
+               coder,
+               producer_type_hints,
+               consumers,  # type: List[Operation]
+               producer_batch_converter):
+    super().__init__(
+        counter_factory,
+        step_name,
+        output_index,
+        consumers,
+        coder,
+        producer_type_hints)
+
+    self.producer_batch_converter = producer_batch_converter
+
+    # Partition consumers into three groups:
+    # - consumers that will be passed elements
+    # - consumers that will be passed batches (where their input batch type
+    #   matches the output of the producer)
+    # - consumers that will be passed converted batches
+    self.element_consumers: List[Operation] = []
+    self.passthrough_batch_consumers: List[Operation] = []
+    other_batch_consumers: DefaultDict[
+        BatchConverter, List[Operation]] = collections.defaultdict(lambda: [])
+
+    for consumer in consumers:
+      if not consumer.get_batching_preference().supports_batches:
+        self.element_consumers.append(consumer)
+      elif (consumer.get_input_batch_converter() ==
+            self.producer_batch_converter):
+        self.passthrough_batch_consumers.append(consumer)
+      else:
+        # Batch consumer with a mismatched batch type
+        if consumer.get_batching_preference().supports_elements:
+          # Pass it elements if we can
+          self.element_consumers.append(consumer)
+        else:
+          # As a last resort, explode and rebatch
+          consumer_batch_converter = consumer.get_input_batch_converter()
+          # This consumer supports batches, it must have a batch converter
+          assert consumer_batch_converter is not None
+          other_batch_consumers[consumer_batch_converter].append(consumer)
+
+    self.other_batch_consumers: Dict[BatchConverter, List[Operation]] = dict(
+        other_batch_consumers)
+
+    self.has_batch_consumers = (
+        self.passthrough_batch_consumers or self.other_batch_consumers)
+    self._batched_elements: List[Any] = []
+
+  def receive(self, windowed_value):
+    # type: (WindowedValue) -> None
+    self.update_counters_start(windowed_value)
+
+    for consumer in self.element_consumers:
+      cython.cast(Operation, consumer).process(windowed_value)
+
+    # TODO: Do this branching when contstructing ConsumerSet
+    if self.has_batch_consumers:
+      self._batched_elements.append(windowed_value)
+
+    self.update_counters_finish()
+
+  def receive_batch(self, windowed_batch):
+    #self.update_counters_start(windowed_value)
+    if self.element_consumers:
+      for wv in windowed_batch.as_windowed_values(
+          self.producer_batch_converter.explode_batch):
+        for consumer in self.element_consumers:
+          cython.cast(Operation, consumer).process(wv)
+
+    for consumer in self.passthrough_batch_consumers:
+      cython.cast(Operation, consumer).process_batch(windowed_batch)
+
+    for (consumer_batch_converter,
+         consumers) in self.other_batch_consumers.items():
+      # Explode and rebatch into the new batch type (ouch!)

Review Comment:
   I think the warnings package has the ability to warn just once. Otherwise, a module-level bool is fine. And, yes, I think Operations have the user-set stage name available (most of the time at least). 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] TheNeuralBit commented on a diff in pull request #17384: [BEAM-14294] Worker changes to support trivial Batched DoFns

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on code in PR #17384:
URL: https://github.com/apache/beam/pull/17384#discussion_r864334832


##########
sdks/python/apache_beam/runners/worker/operations.py:
##########
@@ -223,6 +235,120 @@ def current_element_progress(self):
     return self.consumer.current_element_progress()
 
 
+class GeneralPurposeConsumerSet(ConsumerSet):
+  """ConsumerSet implementation that handles all combinations of possible edges.
+  """
+  def __init__(self,
+               counter_factory,
+               step_name,  # type: str
+               output_index,
+               coder,
+               producer_type_hints,
+               consumers,  # type: List[Operation]
+               producer_batch_converter):
+    super().__init__(
+        counter_factory,
+        step_name,
+        output_index,
+        consumers,
+        coder,
+        producer_type_hints)
+
+    self.producer_batch_converter = producer_batch_converter
+
+    # Partition consumers into three groups:
+    # - consumers that will be passed elements
+    # - consumers that will be passed batches (where their input batch type
+    #   matches the output of the producer)
+    # - consumers that will be passed converted batches
+    self.element_consumers: List[Operation] = []
+    self.passthrough_batch_consumers: List[Operation] = []
+    other_batch_consumers: DefaultDict[
+        BatchConverter, List[Operation]] = collections.defaultdict(lambda: [])
+
+    for consumer in consumers:
+      if not consumer.get_batching_preference().supports_batches:
+        self.element_consumers.append(consumer)
+      elif (consumer.get_input_batch_converter() ==
+            self.producer_batch_converter):
+        self.passthrough_batch_consumers.append(consumer)
+      else:
+        # Batch consumer with a mismatched batch type
+        if consumer.get_batching_preference().supports_elements:
+          # Pass it elements if we can
+          self.element_consumers.append(consumer)
+        else:
+          # As a last resort, explode and rebatch
+          consumer_batch_converter = consumer.get_input_batch_converter()
+          # This consumer supports batches, it must have a batch converter
+          assert consumer_batch_converter is not None
+          other_batch_consumers[consumer_batch_converter].append(consumer)
+
+    self.other_batch_consumers: Dict[BatchConverter, List[Operation]] = dict(
+        other_batch_consumers)
+
+    self.has_batch_consumers = (
+        self.passthrough_batch_consumers or self.other_batch_consumers)
+    self._batched_elements: List[Any] = []
+
+  def receive(self, windowed_value):
+    # type: (WindowedValue) -> None
+    self.update_counters_start(windowed_value)
+
+    for consumer in self.element_consumers:
+      cython.cast(Operation, consumer).process(windowed_value)
+
+    # TODO: Do this branching when contstructing ConsumerSet
+    if self.has_batch_consumers:
+      self._batched_elements.append(windowed_value)
+
+    self.update_counters_finish()
+
+  def receive_batch(self, windowed_batch):
+    #self.update_counters_start(windowed_value)
+    if self.element_consumers:
+      for wv in windowed_batch.as_windowed_values(
+          self.producer_batch_converter.explode_batch):
+        for consumer in self.element_consumers:
+          cython.cast(Operation, consumer).process(wv)
+
+    for consumer in self.passthrough_batch_consumers:
+      cython.cast(Operation, consumer).process_batch(windowed_batch)
+
+    for (consumer_batch_converter,
+         consumers) in self.other_batch_consumers.items():
+      # Explode and rebatch into the new batch type (ouch!)

Review Comment:
   It looks like we could do something like BeamDeprecationWarning to make sure the warning is just logged once: https://github.com/apache/beam/blob/be7823d0a81b00bb0f036b129176dba858e8f6c0/sdks/python/apache_beam/utils/annotations.py#L91-L96
   
   Unfortunately I realized we don't have the producing operation at hand, I'll have to see if there's a lightweight way to get it  when we need it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] TheNeuralBit merged pull request #17384: [BEAM-14294] Worker changes to support trivial Batched DoFns

Posted by GitBox <gi...@apache.org>.
TheNeuralBit merged PR #17384:
URL: https://github.com/apache/beam/pull/17384


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] TheNeuralBit commented on a diff in pull request #17384: [BEAM-14294] Worker changes to support trivial Batched DoFns

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on code in PR #17384:
URL: https://github.com/apache/beam/pull/17384#discussion_r864374894


##########
sdks/python/apache_beam/runners/worker/operations.py:
##########
@@ -223,6 +235,120 @@ def current_element_progress(self):
     return self.consumer.current_element_progress()
 
 
+class GeneralPurposeConsumerSet(ConsumerSet):
+  """ConsumerSet implementation that handles all combinations of possible edges.
+  """
+  def __init__(self,
+               counter_factory,
+               step_name,  # type: str
+               output_index,
+               coder,
+               producer_type_hints,
+               consumers,  # type: List[Operation]
+               producer_batch_converter):
+    super().__init__(
+        counter_factory,
+        step_name,
+        output_index,
+        consumers,
+        coder,
+        producer_type_hints)
+
+    self.producer_batch_converter = producer_batch_converter
+
+    # Partition consumers into three groups:
+    # - consumers that will be passed elements
+    # - consumers that will be passed batches (where their input batch type
+    #   matches the output of the producer)
+    # - consumers that will be passed converted batches
+    self.element_consumers: List[Operation] = []
+    self.passthrough_batch_consumers: List[Operation] = []
+    other_batch_consumers: DefaultDict[
+        BatchConverter, List[Operation]] = collections.defaultdict(lambda: [])
+
+    for consumer in consumers:
+      if not consumer.get_batching_preference().supports_batches:
+        self.element_consumers.append(consumer)
+      elif (consumer.get_input_batch_converter() ==
+            self.producer_batch_converter):
+        self.passthrough_batch_consumers.append(consumer)
+      else:
+        # Batch consumer with a mismatched batch type
+        if consumer.get_batching_preference().supports_elements:
+          # Pass it elements if we can
+          self.element_consumers.append(consumer)
+        else:
+          # As a last resort, explode and rebatch
+          consumer_batch_converter = consumer.get_input_batch_converter()
+          # This consumer supports batches, it must have a batch converter
+          assert consumer_batch_converter is not None
+          other_batch_consumers[consumer_batch_converter].append(consumer)
+
+    self.other_batch_consumers: Dict[BatchConverter, List[Operation]] = dict(
+        other_batch_consumers)
+
+    self.has_batch_consumers = (
+        self.passthrough_batch_consumers or self.other_batch_consumers)
+    self._batched_elements: List[Any] = []
+
+  def receive(self, windowed_value):
+    # type: (WindowedValue) -> None
+    self.update_counters_start(windowed_value)
+
+    for consumer in self.element_consumers:
+      cython.cast(Operation, consumer).process(windowed_value)
+
+    # TODO: Do this branching when contstructing ConsumerSet
+    if self.has_batch_consumers:
+      self._batched_elements.append(windowed_value)

Review Comment:
   Done, I limited this buffer to 4096 elements. Also made sure to verify this in test_batch_pardo



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] TheNeuralBit commented on a diff in pull request #17384: [BEAM-14294] Worker changes to support trivial Batched DoFns

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on code in PR #17384:
URL: https://github.com/apache/beam/pull/17384#discussion_r864371637


##########
sdks/python/apache_beam/runners/common.py:
##########
@@ -1361,10 +1580,80 @@ def process_outputs(
         self.main_receivers.receive(windowed_value)
       else:
         self.tagged_receivers[tag].receive(windowed_value)
+
+    # TODO(BEAM-3937): Remove if block after output counter released.
+    # Only enable per_element_output_counter when counter cythonized
+    if self.per_element_output_counter is not None:
+      self.per_element_output_counter.add_input(output_element_count)
+
+  def process_batch_outputs(
+      self, windowed_input_batch, results, watermark_estimator=None):
+    # type: (WindowedValue, Iterable[Any], Optional[WatermarkEstimator]) -> None
+
+    """Dispatch the result of process computation to the appropriate receivers.
+
+    A value wrapped in a TaggedOutput object will be unwrapped and
+    then dispatched to the appropriate indexed output.
+    """
+    if results is None:
+      # TODO(BEAM-3937): Remove if block after output counter released.
+      # Only enable per_element_output_counter when counter cythonized.
+      if self.per_element_output_counter is not None:
+        self.per_element_output_counter.add_input(0)
+      return
+
+    # TODO(BEAM-10782): Verify that the results object is a valid iterable type
+    #  if performance_runtime_type_check is active, without harming performance
+
+    output_element_count = 0
+    for result in results:
+      # results here may be a generator, which cannot call len on it.
+      output_element_count += 1

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] TheNeuralBit commented on pull request #17384: [BEAM-14294] Worker changes to support trivial Batched DoFns

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on PR #17384:
URL: https://github.com/apache/beam/pull/17384#issuecomment-1117527067

   Run Portable_Python PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org