You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "Abacn (via GitHub)" <gi...@apache.org> on 2023/05/26 18:40:40 UTC

[GitHub] [beam] Abacn commented on a diff in pull request #25965: Replace StorageV1 client with GCS client

Abacn commented on code in PR #25965:
URL: https://github.com/apache/beam/pull/25965#discussion_r1207191764


##########
sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py:
##########
@@ -170,43 +171,51 @@ def run(argv=None, save_main_session=True):
   pipeline_options = PipelineOptions(pipeline_args)
   pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
   with beam.Pipeline(options=pipeline_options) as p:
-
-    lines = p | ReadFromText(known_args.input)
-
-    # with_outputs allows accessing the explicitly tagged outputs of a DoFn.
-    split_lines_result = (
-        lines
-        | beam.ParDo(SplitLinesToWordsFn()).with_outputs(
-            SplitLinesToWordsFn.OUTPUT_TAG_SHORT_WORDS,
-            SplitLinesToWordsFn.OUTPUT_TAG_CHARACTER_COUNT,
-            main='words'))
-
-    # split_lines_result is an object of type DoOutputsTuple. It supports
-    # accessing result in alternative ways.
-    words, _, _ = split_lines_result
-    short_words = split_lines_result[SplitLinesToWordsFn.OUTPUT_TAG_SHORT_WORDS]
-    character_count = split_lines_result.tag_character_count
-
-    # pylint: disable=expression-not-assigned
-    (
-        character_count
-        | 'pair_with_key' >> beam.Map(lambda x: ('chars_temp_key', x))
-        | beam.GroupByKey()
-        | 'count chars' >> beam.Map(lambda char_counts: sum(char_counts[1]))
-        | 'write chars' >> WriteToText(known_args.output + '-chars'))
-
-    # pylint: disable=expression-not-assigned
-    (
-        short_words
-        | 'count short words' >> CountWords()
-        |
-        'write short words' >> WriteToText(known_args.output + '-short-words'))
-
-    # pylint: disable=expression-not-assigned
-    (
-        words
-        | 'count words' >> CountWords()
-        | 'write words' >> WriteToText(known_args.output + '-words'))
+    try:
+      lines = p | ReadFromText(known_args.input)
+
+      # with_outputs allows accessing the explicitly tagged outputs of a DoFn.
+      split_lines_result = (
+          lines
+          | beam.ParDo(SplitLinesToWordsFn()).with_outputs(
+              SplitLinesToWordsFn.OUTPUT_TAG_SHORT_WORDS,
+              SplitLinesToWordsFn.OUTPUT_TAG_CHARACTER_COUNT,
+              main='words'))
+
+      # split_lines_result is an object of type DoOutputsTuple. It supports
+      # accessing result in alternative ways.
+      words, _, _ = split_lines_result
+      short_words = split_lines_result[
+          SplitLinesToWordsFn.OUTPUT_TAG_SHORT_WORDS]
+      character_count = split_lines_result.tag_character_count
+
+      # pylint: disable=expression-not-assigned
+      (
+          character_count
+          | 'pair_with_key' >> beam.Map(lambda x: ('chars_temp_key', x))
+          | beam.GroupByKey()
+          | 'count chars' >> beam.Map(lambda char_counts: sum(char_counts[1]))
+          | 'write chars' >> WriteToText(known_args.output + '-chars'))
+
+      # pylint: disable=expression-not-assigned
+      (
+          short_words
+          | 'count short words' >> CountWords()
+          | 'write short words' >>
+          WriteToText(known_args.output + '-short-words'))
+
+      # pylint: disable=expression-not-assigned
+      (
+          words
+          | 'count words' >> CountWords()
+          | 'write words' >> WriteToText(known_args.output + '-words'))
+    except BeamIOError as err:

Review Comment:
   Mentioned before that, these tests now wrapped with catch - except and would pass even pipeline fails. Please fix them



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org