You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by GitBox <gi...@apache.org> on 2020/03/20 23:41:40 UTC

[GitHub] [beam] davidyan74 commented on a change in pull request #11174: [BEAM-7923] Pop failed transform when error is raised

davidyan74 commented on a change in pull request #11174: [BEAM-7923] Pop failed transform when error is raised
URL: https://github.com/apache/beam/pull/11174#discussion_r395933604
 
 

 ##########
 File path: sdks/python/apache_beam/pipeline.py
 ##########
 @@ -307,58 +307,61 @@ def _replace_if_needed(self, original_transform_node):
           elif len(inputs) == 0:
             input_node = pvalue.PBegin(self.pipeline)
 
-          # We have to add the new AppliedTransform to the stack before expand()
-          # and pop it out later to make sure that parts get added correctly.
-          self.pipeline.transforms_stack.append(replacement_transform_node)
-
-          # Keeping the same label for the replaced node but recursively
-          # removing labels of child transforms of original transform since they
-          # will be replaced during the expand below. This is needed in case
-          # the replacement contains children that have labels that conflicts
-          # with labels of the children of the original.
-          self.pipeline._remove_labels_recursively(original_transform_node)
-
-          new_output = replacement_transform.expand(input_node)
-          assert isinstance(
-              new_output, (dict, pvalue.PValue, pvalue.DoOutputsTuple))
-
-          if isinstance(new_output, pvalue.PValue):
-            new_output.element_type = None
-            self.pipeline._infer_result_type(
-                replacement_transform, inputs, new_output)
-
-          if isinstance(new_output, dict):
-            for new_tag, new_pcoll in new_output.items():
-              replacement_transform_node.add_output(new_pcoll, new_tag)
-          elif isinstance(new_output, pvalue.DoOutputsTuple):
-            replacement_transform_node.add_output(
-                new_output, new_output._main_tag)
-          else:
-            replacement_transform_node.add_output(new_output, new_output.tag)
-
-          # Recording updated outputs. This cannot be done in the same visitor
-          # since if we dynamically update output type here, we'll run into
-          # errors when visiting child nodes.
-          #
-          # NOTE: When replacing multiple outputs, the replacement PCollection
-          # tags must have a matching tag in the original transform.
-          if isinstance(new_output, pvalue.PValue):
-            if not new_output.producer:
-              new_output.producer = replacement_transform_node
-            output_map[original_transform_node.outputs[new_output.tag]] = \
-                new_output
-          elif isinstance(new_output, (pvalue.DoOutputsTuple, tuple)):
-            for pcoll in new_output:
-              if not pcoll.producer:
-                pcoll.producer = replacement_transform_node
-              output_map[original_transform_node.outputs[pcoll.tag]] = pcoll
-          elif isinstance(new_output, dict):
-            for tag, pcoll in new_output.items():
-              if not pcoll.producer:
-                pcoll.producer = replacement_transform_node
-              output_map[original_transform_node.outputs[tag]] = pcoll
-
-          self.pipeline.transforms_stack.pop()
+          try:
+            # We have to add the new AppliedTransform to the stack before
+            # expand() and pop it out later to make sure that parts get added
+            # correctly.
+            self.pipeline.transforms_stack.append(replacement_transform_node)
 
 Review comment:
   This is a python newbie question. Does list.append() ever throw an exception? If so, should we move this out of the try block so that we don't pop() if list.append() fails?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services