You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by GitBox <gi...@apache.org> on 2020/03/23 20:39:45 UTC

[GitHub] [beam] KevinGG commented on a change in pull request #11174: [BEAM-7923] Pop failed transform when error is raised

KevinGG commented on a change in pull request #11174: [BEAM-7923] Pop failed transform when error is raised
URL: https://github.com/apache/beam/pull/11174#discussion_r396742548
 
 

 ##########
 File path: sdks/python/apache_beam/pipeline.py
 ##########
 @@ -307,58 +307,61 @@ def _replace_if_needed(self, original_transform_node):
           elif len(inputs) == 0:
             input_node = pvalue.PBegin(self.pipeline)
 
-          # We have to add the new AppliedTransform to the stack before expand()
-          # and pop it out later to make sure that parts get added correctly.
-          self.pipeline.transforms_stack.append(replacement_transform_node)
-
-          # Keeping the same label for the replaced node but recursively
-          # removing labels of child transforms of original transform since they
-          # will be replaced during the expand below. This is needed in case
-          # the replacement contains children that have labels that conflicts
-          # with labels of the children of the original.
-          self.pipeline._remove_labels_recursively(original_transform_node)
-
-          new_output = replacement_transform.expand(input_node)
-          assert isinstance(
-              new_output, (dict, pvalue.PValue, pvalue.DoOutputsTuple))
-
-          if isinstance(new_output, pvalue.PValue):
-            new_output.element_type = None
-            self.pipeline._infer_result_type(
-                replacement_transform, inputs, new_output)
-
-          if isinstance(new_output, dict):
-            for new_tag, new_pcoll in new_output.items():
-              replacement_transform_node.add_output(new_pcoll, new_tag)
-          elif isinstance(new_output, pvalue.DoOutputsTuple):
-            replacement_transform_node.add_output(
-                new_output, new_output._main_tag)
-          else:
-            replacement_transform_node.add_output(new_output, new_output.tag)
-
-          # Recording updated outputs. This cannot be done in the same visitor
-          # since if we dynamically update output type here, we'll run into
-          # errors when visiting child nodes.
-          #
-          # NOTE: When replacing multiple outputs, the replacement PCollection
-          # tags must have a matching tag in the original transform.
-          if isinstance(new_output, pvalue.PValue):
-            if not new_output.producer:
-              new_output.producer = replacement_transform_node
-            output_map[original_transform_node.outputs[new_output.tag]] = \
-                new_output
-          elif isinstance(new_output, (pvalue.DoOutputsTuple, tuple)):
-            for pcoll in new_output:
-              if not pcoll.producer:
-                pcoll.producer = replacement_transform_node
-              output_map[original_transform_node.outputs[pcoll.tag]] = pcoll
-          elif isinstance(new_output, dict):
-            for tag, pcoll in new_output.items():
-              if not pcoll.producer:
-                pcoll.producer = replacement_transform_node
-              output_map[original_transform_node.outputs[tag]] = pcoll
-
-          self.pipeline.transforms_stack.pop()
+          try:
+            # We have to add the new AppliedTransform to the stack before
+            # expand() and pop it out later to make sure that parts get added
+            # correctly.
+            self.pipeline.transforms_stack.append(replacement_transform_node)
 
 Review comment:
   It wouldn't raise an error if it's still a list on the line, and if it's not a list (becomes a None or some other object) at the moment it causes an error, the finally block would error out too. So it's not necessary to exclude it from the try block.
   
   Putting it inside the try makes it a little bit more self-explained.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services