You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/07/08 02:06:58 UTC

[GitHub] [beam] udim commented on a change in pull request #12009: [BEAM-10258] Support type hint annotations on PTransform's expand()

udim commented on a change in pull request #12009:
URL: https://github.com/apache/beam/pull/12009#discussion_r451233193



##########
File path: sdks/python/apache_beam/typehints/decorators.py
##########
@@ -378,6 +378,56 @@ def has_simple_output_type(self):
         self.output_types and len(self.output_types[0]) == 1 and
         not self.output_types[1])
 
+  def strip_pcoll(self):
+    return self.strip_pcoll_helper(self.input_types,
+                                   self._has_input_types,
+                                   {'input_types': None},
+                                   ['apache_beam.pvalue.PBegin'],
+                                   'An input typehint to a PTransform must be'
+                                   ' a single (or nested) type wrapped by '
+                                   'a PCollection or PBegin. ',
+                                   'strip_pcoll_input()').\
+                strip_pcoll_helper(self.output_types,
+                                   self.has_simple_output_type,
+                                   {'output_types': None},
+                                   ['apache_beam.pvalue.PDone'],
+                                   'An output typehint to a PTransform must be'
+                                   ' a single (or nested) type wrapped by '
+                                   'a PCollection or PDone. ',
+                                   'strip_pcoll_output()')
+
+  def strip_pcoll_helper(
+      self,
+      my_type,            # type: any
+      has_my_type,        # type: Callable[[], bool]
+      kwarg_dict,         # type: Dict[str, any]
+      my_valid_classes,   # type: List[str]
+      error_str,          # type: str
+      source_str          # type: str
+      ):
+    # type: (...) -> IOTypeHints
+
+    if not has_my_type() or len(my_type[0]) != 1:
+      return self
+
+    my_type = my_type[0][0]
+
+    if isinstance(my_type, typehints.AnyTypeConstraint):
+      return self
+
+    valid_classes = ['apache_beam.pvalue.PCollection'] + my_valid_classes
+
+    if not any(valid_class in str(my_type) for valid_class in valid_classes):

Review comment:
       Are string representations necessary here? We typically don't compare by string.
   
   You can also write `if not isinstance(my_type, valid_classes)`.

##########
File path: sdks/python/apache_beam/typehints/decorators.py
##########
@@ -378,6 +379,61 @@ def has_simple_output_type(self):
         self.output_types and len(self.output_types[0]) == 1 and
         not self.output_types[1])
 
+  def strip_pcoll_input(self):
+    # type: () -> IOTypeHints
+
+    error_str = 'An input typehint to a PTransform must be a single (or nested) type wrapped by a PCollection or ' \
+                'PBegin. '
+
+    if any(element is None for element in [self.input_types, self.input_types[0], self.input_types[0][0]]):

Review comment:
       It does support short-circuiting, but the list `[self.input_types, self.input_types[0], self.input_types[0][0]]` is first completely evaluated.

##########
File path: website/www/site/content/en/documentation/sdks/python-type-safety.md
##########
@@ -90,6 +91,15 @@ The following code declares an `int` input and a `str` output type hint on the `
 {{< code_sample "sdks/python/apache_beam/examples/snippets/snippets_test_py3.py" type_hints_map_annotations >}}
 {{< /highlight >}}
 
+The following code demonstrates how to use annotations on `PTransform` subclasses. 
+A valid annotation is a `PCollection`, `PBegin`, or `PDone` that wraps an internal (nested) type. 

Review comment:
       @robertwb are PBegin and PDone part of the public API?

##########
File path: sdks/python/apache_beam/typehints/decorators.py
##########
@@ -378,6 +378,56 @@ def has_simple_output_type(self):
         self.output_types and len(self.output_types[0]) == 1 and
         not self.output_types[1])
 
+  def strip_pcoll(self):
+    return self.strip_pcoll_helper(self.input_types,
+                                   self._has_input_types,
+                                   {'input_types': None},
+                                   ['apache_beam.pvalue.PBegin'],
+                                   'An input typehint to a PTransform must be'
+                                   ' a single (or nested) type wrapped by '
+                                   'a PCollection or PBegin. ',
+                                   'strip_pcoll_input()').\
+                strip_pcoll_helper(self.output_types,
+                                   self.has_simple_output_type,
+                                   {'output_types': None},
+                                   ['apache_beam.pvalue.PDone'],
+                                   'An output typehint to a PTransform must be'
+                                   ' a single (or nested) type wrapped by '
+                                   'a PCollection or PDone. ',
+                                   'strip_pcoll_output()')
+
+  def strip_pcoll_helper(
+      self,
+      my_type,            # type: any
+      has_my_type,        # type: Callable[[], bool]
+      kwarg_dict,         # type: Dict[str, any]
+      my_valid_classes,   # type: List[str]
+      error_str,          # type: str
+      source_str          # type: str
+      ):
+    # type: (...) -> IOTypeHints
+
+    if not has_my_type() or len(my_type[0]) != 1:
+      return self
+
+    my_type = my_type[0][0]
+
+    if isinstance(my_type, typehints.AnyTypeConstraint):
+      return self
+
+    valid_classes = ['apache_beam.pvalue.PCollection'] + my_valid_classes
+
+    if not any(valid_class in str(my_type) for valid_class in valid_classes):
+      raise TypeCheckError(error_str)
+
+    if not hasattr(my_type, '__args__'):  # e.g. PCollection
+      kwarg_dict[next(iter(kwarg_dict))] = ((typehints.Any, ), {})

Review comment:
       The `next(iter(kwarg_dict))` call hard to read. Since there is only one item in the dict, you could pass the key instead and create the dictionary below.

##########
File path: sdks/python/apache_beam/pvalue.py
##########
@@ -222,7 +222,7 @@ class _InvalidUnpickledPCollection(object):
   pass
 
 
-class PBegin(PValue):
+class PBegin(PValue, Generic[T]):

Review comment:
       PBegin and Done do not contain elements like a PCollection. I think of them as placeholders for transforms that don't have inputs or outputs, respectively.

##########
File path: sdks/python/apache_beam/examples/snippets/snippets_test_py3.py
##########
@@ -96,6 +96,18 @@ def my_fn(element: int) -> str:
       ids = numbers | 'to_id' >> beam.Map(my_fn)
       # [END type_hints_map_annotations]
 
+    # Example using an annotated PTransform.
+    with self.assertRaises(typehints.TypeCheckError):
+      # [START type_hints_ptransforms]
+      from apache_beam.pvalue import PCollection
+
+      class IntToStr(beam.PTransform):
+        def expand(pcoll: PCollection[int]) -> PCollection[str]:
+          return pcoll | beam.Map(lambda elem: str(elem))
+
+      ids = numbers | 'convert to str' >> beam.Map(my_fn)

Review comment:
       I think this line was not updated to use `IntToStr`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org