You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/04/08 23:13:50 UTC

[GitHub] [beam] robertwb commented on a diff in pull request #17253: [BEAM-14213] Add API and construction time validation for Batched DoFns

robertwb commented on code in PR #17253:
URL: https://github.com/apache/beam/pull/17253#discussion_r846527918


##########
sdks/python/apache_beam/transforms/batch_dofn_test.py:
##########
@@ -0,0 +1,123 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""UnitTests for Batched DoFn (process_batch) API."""
+
+# pytype: skip-file
+
+import unittest
+from typing import Iterator
+from typing import List
+from typing import NamedTuple
+from typing import Optional
+
+from parameterized import parameterized_class
+
+import apache_beam as beam
+
+
+class ElementDoFn(beam.DoFn):
+  def process(self, element: int, *args, **kwargs) -> Iterator[int]:
+    yield element
+
+
+class BatchDoFn(beam.DoFn):
+  def process_batch(self, batch: List[int], *args,
+                    **kwargs) -> Iterator[List[int]]:
+    yield [element * 2 for element in batch]
+
+
+class BatchDoFnNoReturnAnnotation(beam.DoFn):
+  def process_batch(self, batch: List[int], *args, **kwargs):
+    yield [element * 2 for element in batch]
+
+
+class EitherDoFn(beam.DoFn):
+  def process(self, element: int, *args, **kwargs) -> Iterator[int]:
+    yield element
+
+  def process_batch(self, batch: List[int], *args,
+                    **kwargs) -> Iterator[List[int]]:
+    yield [element * 2 for element in batch]
+
+
+class BatchDoFnTestCase(NamedTuple):
+  dofn: beam.DoFn
+  process_defined: bool

Review Comment:
   Aren't these both functions of dofn? Can we simply infer it? 



##########
sdks/python/apache_beam/utils/windowed_value.pxd:
##########
@@ -43,6 +43,14 @@ cdef class WindowedValue(object):
 
   cpdef WindowedValue with_value(self, new_value)
 
+@cython.final

Review Comment:
   In particular, for non-1:1 batch processing, we may want to (optionally?) expose the windowing information as part of the batch itself, e.g. as additional columns (for a dataframe-like batch), or as a list of WindowedValues (for a List batch), or ???
   
   An alternative is to have batches of homogeneous windowing information (which would allow dropping the 1:1 requirement, at the expense of possibly smaller batches). 



##########
sdks/python/apache_beam/transforms/batch_dofn_test.py:
##########
@@ -0,0 +1,123 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""UnitTests for Batched DoFn (process_batch) API."""
+
+# pytype: skip-file
+
+import unittest
+from typing import Iterator
+from typing import List
+from typing import NamedTuple
+from typing import Optional
+
+from parameterized import parameterized_class
+
+import apache_beam as beam
+
+
+class ElementDoFn(beam.DoFn):
+  def process(self, element: int, *args, **kwargs) -> Iterator[int]:
+    yield element
+
+
+class BatchDoFn(beam.DoFn):
+  def process_batch(self, batch: List[int], *args,
+                    **kwargs) -> Iterator[List[int]]:
+    yield [element * 2 for element in batch]

Review Comment:
   I think we should get feedback from the list whether this should return an iterable of batches, or a single batch. There are pros and cons to each. 



##########
sdks/python/apache_beam/transforms/core.py:
##########
@@ -1302,6 +1349,42 @@ def default_type_hints(self):
   def infer_output_type(self, input_type):
     return self.fn.infer_output_type(input_type)
 
+  def infer_batch_converters(self, input_element_type):
+    # This assumes batch input implies batch output
+    # TODO: Define and handle yields_batches and yields_elements
+    if self.fn.process_batch_defined:
+      input_batch_type = self.fn.get_input_batch_type()
+
+      if input_batch_type is None:
+        raise TypeError(
+            "process_batch method on {self.fn!r} does not have "
+            "an input type annoation")
+
+      output_batch_type = self.fn.get_output_batch_type()
+      if output_batch_type is None:
+        raise TypeError(
+            "process_batch method on {self.fn!r} does not have "
+            "a return type annoation")
+
+      # Generate a batch converter to convert between the input type and the
+      # (batch) input type of process_batch
+      self.fn.input_batch_converter = BatchConverter.from_typehints(
+          element_type=input_element_type, batch_type=input_batch_type)
+
+      # Generate a batch converter to convert between the output type and the
+      # (batch) output type of process_batch
+      output_element_type = self.infer_output_type(input_element_type)
+      self.fn.input_batch_converter = BatchConverter.from_typehints(

Review Comment:
   self.fn.output_batch_converter?



##########
sdks/python/apache_beam/typehints/batch.py:
##########
@@ -0,0 +1,191 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Utilities for type-hinting batched types for use in the Beam SDK.
+
+A batched type is a type B that is logically equivalent to Sequence[E], where E
+is some other type. Typically B has a different physical representation than
+Sequence[E] for performance reasons.
+
+A trivial example is B=np.array(dtype=np.int64), E=int.
+
+Batched type hints are used to enable more efficient processing of
+a PCollection[E], by allowing users to write DoFns that operate on
+multi-element partitions of the PCollection represented with type B."""
+
+from typing import Generic
+from typing import Iterator
+from typing import Optional
+from typing import Sequence
+from typing import TypeVar
+
+import numpy as np
+
+from apache_beam.typehints.typehints import TypeConstraint
+
+B = TypeVar('B')
+E = TypeVar('E')
+
+BATCH_CONVERTER_REGISTRY = []
+
+
+class BatchConverter(Generic[B, E]):
+  def __init__(self, batch_type, element_type):
+    self._batch_type = batch_type
+    self._element_type = element_type
+
+  def produce_batch(self, elements: Sequence[E]) -> B:
+    """Convert an instance of List[E] to a single instance of B."""
+    raise NotImplementedError
+
+  def explode_batch(self, batch: B) -> Iterator[E]:
+    """Convert an instance of B to Iterator[E]."""
+    raise NotImplementedError
+
+  def combine_batches(self, batches: Sequence[B]) -> B:
+    raise NotImplementedError
+
+  def get_length(self, batch: B) -> int:
+    raise NotImplementedError
+
+  @staticmethod
+  def register(batching_util_fn):

Review Comment:
   I know it's likely messy, but this parameter would benefit from type hints (and maybe a docstring). 



##########
sdks/python/apache_beam/transforms/batch_dofn_test.py:
##########
@@ -0,0 +1,123 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""UnitTests for Batched DoFn (process_batch) API."""
+
+# pytype: skip-file
+
+import unittest
+from typing import Iterator
+from typing import List
+from typing import NamedTuple
+from typing import Optional
+
+from parameterized import parameterized_class
+
+import apache_beam as beam
+
+
+class ElementDoFn(beam.DoFn):
+  def process(self, element: int, *args, **kwargs) -> Iterator[int]:
+    yield element
+
+
+class BatchDoFn(beam.DoFn):
+  def process_batch(self, batch: List[int], *args,
+                    **kwargs) -> Iterator[List[int]]:
+    yield [element * 2 for element in batch]
+
+
+class BatchDoFnNoReturnAnnotation(beam.DoFn):
+  def process_batch(self, batch: List[int], *args, **kwargs):
+    yield [element * 2 for element in batch]
+
+
+class EitherDoFn(beam.DoFn):
+  def process(self, element: int, *args, **kwargs) -> Iterator[int]:
+    yield element
+
+  def process_batch(self, batch: List[int], *args,
+                    **kwargs) -> Iterator[List[int]]:
+    yield [element * 2 for element in batch]
+
+
+class BatchDoFnTestCase(NamedTuple):
+  dofn: beam.DoFn
+  process_defined: bool
+  process_batch_defined: bool
+  input_batch_type: Optional[type]
+  output_batch_type: Optional[type]
+
+
+def make_map_pardo():
+  return beam.Map(lambda x: x * 2).dofn
+
+
+@parameterized_class(
+    BatchDoFnTestCase.__annotations__.keys(),
+    [
+        BatchDoFnTestCase(
+            dofn=ElementDoFn(),
+            process_defined=True,
+            process_batch_defined=False,
+            input_batch_type=None,
+            output_batch_type=None),
+        BatchDoFnTestCase(
+            dofn=BatchDoFn(),
+            process_defined=False,
+            process_batch_defined=True,
+            input_batch_type=beam.typehints.List[int],
+            output_batch_type=beam.typehints.List[int]),
+        BatchDoFnTestCase(
+            dofn=BatchDoFnNoReturnAnnotation(),
+            process_defined=False,
+            process_batch_defined=True,
+            input_batch_type=beam.typehints.List[int],
+            output_batch_type=beam.typehints.List[int]),
+        BatchDoFnTestCase(
+            dofn=EitherDoFn(),
+            process_defined=True,
+            process_batch_defined=True,
+            input_batch_type=beam.typehints.List[int],
+            output_batch_type=beam.typehints.List[int]),
+        #BatchDoFnTestCase(
+        #    dofn=make_map_pardo().dofn,
+        #    process_defined=True,
+        #    process_batch_defined=False,
+        #    input_batch_type=None,
+        #    batch_output_type=None),
+    ],
+    class_name_func=lambda _,

Review Comment:
   This is hard to read, could some ()'s make yapf happier? 



##########
sdks/python/apache_beam/transforms/core.py:
##########
@@ -1302,6 +1349,42 @@ def default_type_hints(self):
   def infer_output_type(self, input_type):
     return self.fn.infer_output_type(input_type)
 
+  def infer_batch_converters(self, input_element_type):
+    # This assumes batch input implies batch output
+    # TODO: Define and handle yields_batches and yields_elements
+    if self.fn.process_batch_defined:
+      input_batch_type = self.fn.get_input_batch_type()
+
+      if input_batch_type is None:
+        raise TypeError(
+            "process_batch method on {self.fn!r} does not have "
+            "an input type annoation")
+
+      output_batch_type = self.fn.get_output_batch_type()
+      if output_batch_type is None:
+        raise TypeError(
+            "process_batch method on {self.fn!r} does not have "
+            "a return type annoation")
+
+      # Generate a batch converter to convert between the input type and the
+      # (batch) input type of process_batch
+      self.fn.input_batch_converter = BatchConverter.from_typehints(
+          element_type=input_element_type, batch_type=input_batch_type)
+
+      # Generate a batch converter to convert between the output type and the
+      # (batch) output type of process_batch
+      output_element_type = self.infer_output_type(input_element_type)
+      self.fn.input_batch_converter = BatchConverter.from_typehints(
+          element_type=output_element_type, batch_type=output_batch_type)
+
+  def infer_output_batch_type(self):
+    # TODO: Handle process() with @yields_batch

Review Comment:
   Let's file a JIRA for these TODOs related to `@yields_batch`. 



##########
sdks/python/apache_beam/typehints/batch.py:
##########
@@ -0,0 +1,191 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Utilities for type-hinting batched types for use in the Beam SDK.
+
+A batched type is a type B that is logically equivalent to Sequence[E], where E
+is some other type. Typically B has a different physical representation than
+Sequence[E] for performance reasons.
+
+A trivial example is B=np.array(dtype=np.int64), E=int.
+
+Batched type hints are used to enable more efficient processing of
+a PCollection[E], by allowing users to write DoFns that operate on
+multi-element partitions of the PCollection represented with type B."""
+
+from typing import Generic
+from typing import Iterator
+from typing import Optional
+from typing import Sequence
+from typing import TypeVar
+
+import numpy as np
+
+from apache_beam.typehints.typehints import TypeConstraint
+
+B = TypeVar('B')
+E = TypeVar('E')
+
+BATCH_CONVERTER_REGISTRY = []
+
+
+class BatchConverter(Generic[B, E]):
+  def __init__(self, batch_type, element_type):
+    self._batch_type = batch_type
+    self._element_type = element_type
+
+  def produce_batch(self, elements: Sequence[E]) -> B:
+    """Convert an instance of List[E] to a single instance of B."""
+    raise NotImplementedError
+
+  def explode_batch(self, batch: B) -> Iterator[E]:
+    """Convert an instance of B to Iterator[E]."""
+    raise NotImplementedError
+
+  def combine_batches(self, batches: Sequence[B]) -> B:
+    raise NotImplementedError
+
+  def get_length(self, batch: B) -> int:
+    raise NotImplementedError
+
+  @staticmethod
+  def register(batching_util_fn):
+    BATCH_CONVERTER_REGISTRY.append(batching_util_fn)
+    return batching_util_fn
+
+  @staticmethod
+  def from_typehints(*, element_type, batch_type) -> 'BatchConverter':
+    for constructor in BATCH_CONVERTER_REGISTRY:
+      result = constructor(element_type, batch_type)
+      if result is not None:
+        return result
+
+    # TODO: Include explanations for realistic candidate BatchConverter
+    raise TypeError(
+        f"Unable to find BatchConverter for element_type {element_type!r} and "
+        f"batch_type {batch_type!r}")
+
+  @property
+  def batch_type(self):
+    return self._batch_type
+
+  @property
+  def element_type(self):
+    return self._element_type
+
+
+N = "ARBITRARY LENGTH DIMENSION"
+
+
+class NumpyBatchConverter(BatchConverter):

Review Comment:
   ListBatchConverter would be a useful one to have (both in practice, and as a canonical example). 



##########
sdks/python/apache_beam/typehints/batch.py:
##########
@@ -0,0 +1,191 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Utilities for type-hinting batched types for use in the Beam SDK.
+
+A batched type is a type B that is logically equivalent to Sequence[E], where E
+is some other type. Typically B has a different physical representation than
+Sequence[E] for performance reasons.
+
+A trivial example is B=np.array(dtype=np.int64), E=int.
+
+Batched type hints are used to enable more efficient processing of
+a PCollection[E], by allowing users to write DoFns that operate on
+multi-element partitions of the PCollection represented with type B."""
+
+from typing import Generic
+from typing import Iterator
+from typing import Optional
+from typing import Sequence
+from typing import TypeVar
+
+import numpy as np
+
+from apache_beam.typehints.typehints import TypeConstraint
+
+B = TypeVar('B')
+E = TypeVar('E')
+
+BATCH_CONVERTER_REGISTRY = []
+
+
+class BatchConverter(Generic[B, E]):
+  def __init__(self, batch_type, element_type):
+    self._batch_type = batch_type
+    self._element_type = element_type
+
+  def produce_batch(self, elements: Sequence[E]) -> B:
+    """Convert an instance of List[E] to a single instance of B."""
+    raise NotImplementedError
+
+  def explode_batch(self, batch: B) -> Iterator[E]:
+    """Convert an instance of B to Iterator[E]."""
+    raise NotImplementedError
+
+  def combine_batches(self, batches: Sequence[B]) -> B:
+    raise NotImplementedError
+
+  def get_length(self, batch: B) -> int:
+    raise NotImplementedError
+
+  @staticmethod
+  def register(batching_util_fn):
+    BATCH_CONVERTER_REGISTRY.append(batching_util_fn)
+    return batching_util_fn
+
+  @staticmethod
+  def from_typehints(*, element_type, batch_type) -> 'BatchConverter':
+    for constructor in BATCH_CONVERTER_REGISTRY:
+      result = constructor(element_type, batch_type)

Review Comment:
   Should the API be to throw (possibly with an explanatory message) rather than return None? 



##########
sdks/python/apache_beam/transforms/core.py:
##########
@@ -1302,6 +1349,42 @@ def default_type_hints(self):
   def infer_output_type(self, input_type):
     return self.fn.infer_output_type(input_type)
 
+  def infer_batch_converters(self, input_element_type):
+    # This assumes batch input implies batch output
+    # TODO: Define and handle yields_batches and yields_elements
+    if self.fn.process_batch_defined:
+      input_batch_type = self.fn.get_input_batch_type()
+
+      if input_batch_type is None:
+        raise TypeError(
+            "process_batch method on {self.fn!r} does not have "
+            "an input type annoation")
+
+      output_batch_type = self.fn.get_output_batch_type()
+      if output_batch_type is None:
+        raise TypeError(
+            "process_batch method on {self.fn!r} does not have "
+            "a return type annoation")
+
+      # Generate a batch converter to convert between the input type and the
+      # (batch) input type of process_batch
+      self.fn.input_batch_converter = BatchConverter.from_typehints(
+          element_type=input_element_type, batch_type=input_batch_type)
+
+      # Generate a batch converter to convert between the output type and the
+      # (batch) output type of process_batch
+      output_element_type = self.infer_output_type(input_element_type)
+      self.fn.input_batch_converter = BatchConverter.from_typehints(
+          element_type=output_element_type, batch_type=output_batch_type)
+
+  def infer_output_batch_type(self):
+    # TODO: Handle process() with @yields_batch
+    if not self.fn.process_batch_defined:

Review Comment:
   I wonder if here (and above) we could just let `self.fn.get_{input,output}_batch_type` handle checking for `process_batch_defined` and a future `yields_batch` rather than scattering this logic. 



##########
sdks/python/apache_beam/utils/windowed_value.pxd:
##########
@@ -43,6 +43,14 @@ cdef class WindowedValue(object):
 
   cpdef WindowedValue with_value(self, new_value)
 
+@cython.final

Review Comment:
   Let's make this an interface. (Maybe split it out into a different PR as well.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org