You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/08/03 22:12:28 UTC

[GitHub] [beam] robertwb commented on a change in pull request #12352: [BEAM-10549] Improve runtime type checking performance for the Python SDK

robertwb commented on a change in pull request #12352:
URL: https://github.com/apache/beam/pull/12352#discussion_r464679106



##########
File path: sdks/python/apache_beam/runners/worker/operations.py
##########
@@ -238,6 +247,7 @@ def __init__(self,
     self.execution_context = None  # type: Optional[ExecutionContext]
     self.consumers = collections.defaultdict(
         list)  # type: DefaultDict[int, List[Operation]]
+    self.producer = None

Review comment:
       Where is this used? 

##########
File path: sdks/python/apache_beam/runners/common.py
##########
@@ -1340,6 +1342,17 @@ def process_outputs(
         self.per_element_output_counter.add_input(0)
       return
 
+    if isinstance(results, (dict, str, unicode, bytes)):
+      results_type = type(results).__name__
+      raise TypeCheckError(
+          'Returning a %s from a ParDo or FlatMap is '
+          'discouraged. Please use list("%s") if you really '
+          'want this behavior.' % (results_type, results))
+    elif not isinstance(results, collections.Iterable):

Review comment:
       This check is slow. Let's guard this or remove it (as it will still fail below).

##########
File path: sdks/python/apache_beam/runners/worker/bundle_processor.py
##########
@@ -188,7 +188,8 @@ def __init__(self,
             self.name_context.step_name,
             0,
             next(iter(itervalues(consumers))),
-            self.windowed_coder)
+            self.windowed_coder,
+            self)

Review comment:
       Generally it's preferable not to pass something big like `self`, but rather just the subset of information that's required here (e.g. the dict of type hint sources to type hints). This will also be needed for the cases that different outputs have different types. 

##########
File path: sdks/python/apache_beam/typehints/typecheck.py
##########
@@ -265,3 +268,71 @@ def visit_transform(self, applied_transform):
                 transform.get_type_hints(),
                 applied_transform.full_label),
             applied_transform.full_label)
+
+
+class PerformanceTypeCheckVisitor(pipeline.PipelineVisitor):
+
+  _in_combine = False
+  combine_classes = (
+      core.CombineFn,
+      core.CombinePerKey,
+      core.CombineValuesDoFn,
+      core.CombineValues,
+      core.CombineGlobally)
+
+  def enter_composite_transform(self, applied_transform):
+    if isinstance(applied_transform.transform, self.combine_classes):
+      self._in_combine = True
+
+  def leave_composite_transform(self, applied_transform):
+    if isinstance(applied_transform.transform, self.combine_classes):
+      self._in_combine = False
+
+  def visit_transform(self, applied_transform):
+    transform = applied_transform.transform
+    if isinstance(transform, core.ParDo):
+      if not self._in_combine:
+        transform.fn._full_label = applied_transform.full_label
+        self.store_type_hints(transform)
+
+  def store_type_hints(self, transform):
+    type_hints = transform.get_type_hints()
+
+    input_types = None
+    if type_hints.input_types:
+      normal_hints, kwarg_hints = type_hints.input_types
+
+      if kwarg_hints:
+        input_types = kwarg_hints
+      if normal_hints:
+        input_types = normal_hints
+
+    output_types = None
+    if type_hints.output_types:
+      normal_hints, kwarg_hints = type_hints.output_types
+
+      if kwarg_hints:
+        output_types = kwarg_hints
+      if normal_hints:
+        output_types = normal_hints
+
+    try:
+      argspec = inspect.getfullargspec(transform.fn._process_argspec_fn())
+      if len(argspec.args):
+        arg_index = 0
+        if argspec.args[0] == 'self':
+          arg_index = 1
+        transform.fn._runtime_parameter_name = argspec.args[arg_index]
+        if isinstance(input_types, dict):
+          input_types = (input_types[argspec.args[arg_index]], )
+    except TypeError:
+      pass
+
+    if input_types and len(input_types):
+      input_types = input_types[0]
+
+    if output_types and len(output_types):
+      output_types = output_types[0]
+
+    transform.fn._runtime_type_hints = type_hints._replace(
+        input_types=input_types, output_types=output_types)

Review comment:
       What if this transform has a type hint and the DoFn itself has a type hint? Will we check both? 
   
   I would probably create the dictionary of {type hint source string: type hint} right here, pre-packaged and ready to be used directly from the worker.

##########
File path: sdks/python/apache_beam/runners/worker/opcounters.py
##########
@@ -202,6 +209,37 @@ def __init__(
     self._sample_counter = 0
     self._next_sample = 0
 
+    self.producer_type_hints = None
+    self.producer_full_label = None
+    self.producer_parameter_name = None
+
+    if producer and hasattr(producer, 'spec') and hasattr(producer.spec,

Review comment:
       This logic belongs in the ParDoOperator (which I think already has a deserialized fn in hand). 

##########
File path: website/www/site/content/en/documentation/sdks/python-type-safety.md
##########
@@ -210,7 +210,21 @@ However, if you enable runtime type checking, the code is guaranteed to fail at
 {{< code_sample "sdks/python/apache_beam/examples/snippets/snippets_test.py" type_hints_runtime_on >}}
 {{< /highlight >}}
 
-Note that because runtime type checks are done for each `PCollection` element, enabling this feature may incur a significant performance penalty. It is therefore recommended that runtime type checks are disabled for production pipelines.
+Note that because runtime type checks are done for each `PCollection` element, enabling this feature may incur a significant performance penalty. It is therefore recommended that runtime type checks are disabled for production pipelines. See the following section for a quicker, production-friendly alternative.
+
+### Faster Runtime Type Checking
+You can enable faster, sampling-based runtime type checking by setting the pipeline option `performance_runtime_type_check` to `True`.
+
+The is a Python 3 only feature that works by runtime type checking a small subset of values, called a sample, using optimized Cython code.

Review comment:
       Why is this Python 3 only? (Not that Python 2 is going to be around for long...)

##########
File path: sdks/python/apache_beam/options/pipeline_options.py
##########
@@ -466,6 +466,13 @@ def _add_argparse_args(cls, parser):
         help='Enable type checking at pipeline execution '
         'time. NOTE: only supported with the '
         'DirectRunner')
+    parser.add_argument(
+        '--performance_runtime_type_check',
+        default=False,
+        action='store_true',
+        help='Enable faster type checking via sampling at pipeline execution '
+        'time. NOTE: only supported with the '

Review comment:
       This should say "only supported with portable runners (including the direct runner)."

##########
File path: website/www/site/content/en/documentation/sdks/python-type-safety.md
##########
@@ -210,7 +210,21 @@ However, if you enable runtime type checking, the code is guaranteed to fail at
 {{< code_sample "sdks/python/apache_beam/examples/snippets/snippets_test.py" type_hints_runtime_on >}}
 {{< /highlight >}}
 
-Note that because runtime type checks are done for each `PCollection` element, enabling this feature may incur a significant performance penalty. It is therefore recommended that runtime type checks are disabled for production pipelines.
+Note that because runtime type checks are done for each `PCollection` element, enabling this feature may incur a significant performance penalty. It is therefore recommended that runtime type checks are disabled for production pipelines. See the following section for a quicker, production-friendly alternative.
+
+### Faster Runtime Type Checking
+You can enable faster, sampling-based runtime type checking by setting the pipeline option `performance_runtime_type_check` to `True`.
+
+The is a Python 3 only feature that works by runtime type checking a small subset of values, called a sample, using optimized Cython code.
+
+Currently, this feature does not support runtime type checking for side inputs or combine operations. Specifically, this feature will not runtime type check the following transforms:

Review comment:
       Rather than enumerating the transforms here, I would just say that it doesn't run on combining operations. 

##########
File path: sdks/python/apache_beam/runners/worker/opcounters.py
##########
@@ -202,6 +209,37 @@ def __init__(
     self._sample_counter = 0
     self._next_sample = 0
 
+    self.producer_type_hints = None
+    self.producer_full_label = None
+    self.producer_parameter_name = None
+
+    if producer and hasattr(producer, 'spec') and hasattr(producer.spec,
+                                                          'serialized_fn'):
+      fns = pickler.loads(producer.spec.serialized_fn)
+      if fns:
+        if hasattr(fns[0], '_runtime_type_hints'):
+          self.producer_type_hints = fns[0]._runtime_type_hints
+        if hasattr(fns[0], '_full_label'):
+          self.producer_full_label = fns[0]._full_label
+        if hasattr(fns[0], '_runtime_parameter_name'):
+          self.producer_parameter_name = fns[0]._runtime_parameter_name
+
+    self.consumer_type_hints = []
+    self.consumer_full_labels = []
+    self.consumer_parameter_names = []
+
+    if consumers:

Review comment:
       Move this logic into the visitor, rather than doing graph inspection on the workers. This will also allow us to consolidate all type checks into a single dict, rather than having redundant code for producer/consumer. 

##########
File path: sdks/python/apache_beam/pipeline.py
##########
@@ -520,10 +520,26 @@ def run(self, test_runner_api='AUTO'):
             self._options,
             allow_proto_holders=True).run(False)
 
+      if (self._options.view_as(TypeOptions).runtime_type_check and
+          self._options.view_as(TypeOptions).performance_runtime_type_check):
+        raise RuntimeError(

Review comment:
       Why not?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org