You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/03/29 23:59:35 UTC

[GitHub] [beam] TheNeuralBit commented on a change in pull request #17175: [WIP] Initial Implementation of Batched DoFns for Python

TheNeuralBit commented on a change in pull request #17175:
URL: https://github.com/apache/beam/pull/17175#discussion_r837999130



##########
File path: sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py
##########
@@ -121,6 +122,27 @@ def test_pardo(self):
           | beam.Map(lambda e: e + 'x'))
       assert_that(res, equal_to(['aax', 'bcbcx']))
 
+  def test_batch_pardo(self):
+    class MultiplyDoFn(beam.DoFn):
+      def process_batch(self, batch: np.ndarray) -> np.ndarray:
+        assert isinstance(batch, np.ndarray)
+        yield batch * 2
+
+      # infer_output_type must be defined (when there's no process method),
+      # otherwise we don't know the input type is the same as output type.

Review comment:
       We could also use with_input_type(T) with_output_type(T) where T is a generic

##########
File path: sdks/python/apache_beam/pipeline.py
##########
@@ -772,6 +772,14 @@ def _infer_result_type(
       input_element_type = (
           input_element_types_tuple[0] if len(input_element_types_tuple) == 1
           else typehints.Union[input_element_types_tuple])
+      input_batch_type = inputs[0].batch_type
+
+      # All defined batch_types should be equivalent
+      # TODO: Nicer error

Review comment:
       There can't actually be multiple inputs to a ParDo. We should explicitly test the Flatten case

##########
File path: sdks/python/apache_beam/utils/windowed_value.py
##########
@@ -270,6 +274,79 @@ def __reduce__(self):
 
 
 # TODO(robertwb): Move this to a static method.
+
+
+class WindowedBatch(object):

Review comment:
       Maybe make this an interface, have this as the first concrete implementation (future implementations may have timestamps stored inside the value, for example).

##########
File path: sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py
##########
@@ -121,6 +122,27 @@ def test_pardo(self):
           | beam.Map(lambda e: e + 'x'))
       assert_that(res, equal_to(['aax', 'bcbcx']))
 
+  def test_batch_pardo(self):
+    class MultiplyDoFn(beam.DoFn):
+      def process_batch(self, batch: np.ndarray) -> np.ndarray:
+        assert isinstance(batch, np.ndarray)
+        yield batch * 2

Review comment:
       Should we disallow generators, or require them?

##########
File path: sdks/python/apache_beam/pipeline.py
##########
@@ -789,6 +797,10 @@ def _infer_result_type(
       # Any remaining type variables have no bindings higher than this scope.
       result_pcollection.element_type = typehints.bind_type_variables(
           result_element_type, {'*': typehints.Any})
+      # TODO: Where should we actually do this?
+      if isinstance(transform, ParDo):
+        transform.infer_batch_converter(input_element_type, input_batch_type)
+        result_pcollection.batch_type = transform.infer_output_batch_type()

Review comment:
       It's insufficient to rely on the next DoFn to resolve this batch converter.
   
   What if the output is a Flatten or there's a fusion break (to test this add a side input between a and b).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org