You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "ASF GitHub Bot (Jira)" <ji...@apache.org> on 2021/05/07 18:39:00 UTC

[jira] [Work logged] (BEAM-10144) Update pipeline options snippets for best practices

     [ https://issues.apache.org/jira/browse/BEAM-10144?focusedWorklogId=593457&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-593457 ]

ASF GitHub Bot logged work on BEAM-10144:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 07/May/21 18:38
            Start Date: 07/May/21 18:38
    Worklog Time Spent: 10m 
      Work Description: davidcavazos commented on a change in pull request #14738:
URL: https://github.com/apache/beam/pull/14738#discussion_r628428329



##########
File path: sdks/python/apache_beam/examples/snippets/snippets.py
##########
@@ -110,80 +115,71 @@ def filter_words(unused_x):
   import apache_beam as beam
   from apache_beam.options.pipeline_options import PipelineOptions
 
-  with beam.Pipeline(options=PipelineOptions()) as p:
+  beam_options = PipelineOptions()
+  with beam.Pipeline(options=beam_options) as pipeline:
     pass  # build your pipeline here
     # [END pipelines_constructing_creating]
 
-    with TestPipeline() as p:  # Use TestPipeline for testing.
-      # pylint: disable=line-too-long
+    # [START pipelines_constructing_reading]
+    lines = pipeline | 'ReadMyFile' >> beam.io.ReadFromText(
+        'gs://some/inputData.txt')
+    # [END pipelines_constructing_reading]
 
-      # [START pipelines_constructing_reading]
-      lines = p | 'ReadMyFile' >> beam.io.ReadFromText(
-          'gs://some/inputData.txt')
-      # [END pipelines_constructing_reading]
+    # [START pipelines_constructing_applying]
+    words = lines | beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
+    reversed_words = words | ReverseWords()
+    # [END pipelines_constructing_applying]
 
-      # [START pipelines_constructing_applying]
-      words = lines | beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
-      reversed_words = words | ReverseWords()
-      # [END pipelines_constructing_applying]
+    # [START pipelines_constructing_writing]
+    filtered_words = reversed_words | 'FilterWords' >> beam.Filter(filter_words)
+    filtered_words | 'WriteMyFile' >> beam.io.WriteToText(
+        'gs://some/outputData.txt')
+    # [END pipelines_constructing_writing]
 
-      # [START pipelines_constructing_writing]
-      filtered_words = reversed_words | 'FilterWords' >> beam.Filter(
-          filter_words)
-      filtered_words | 'WriteMyFile' >> beam.io.WriteToText(
-          'gs://some/outputData.txt')
-      # [END pipelines_constructing_writing]
+    pipeline.visit(SnippetUtils.RenameFiles(renames))
 
-      p.visit(SnippetUtils.RenameFiles(renames))
 
-
-def model_pipelines(argv):
+def model_pipelines():
   """A wordcount snippet as a simple pipeline example."""
   # [START model_pipelines]
+  import argparse
   import re
+  import sys
 
   import apache_beam as beam
   from apache_beam.options.pipeline_options import PipelineOptions
 
-  class MyOptions(PipelineOptions):
-    @classmethod
-    def _add_argparse_args(cls, parser):
-      parser.add_argument(
-          '--input',
-          dest='input',
-          default='gs://dataflow-samples/shakespeare/kinglear'
-          '.txt',
-          help='Input file to process.')
-      parser.add_argument(
-          '--output',
-          dest='output',
-          required=True,
-          help='Output file to write results to.')
-
-  pipeline_options = PipelineOptions(argv)
-  my_options = pipeline_options.view_as(MyOptions)
-
-  with beam.Pipeline(options=pipeline_options) as p:
+  parser = argparse.ArgumentParser()
+  parser.add_argument(
+      '--input',
+      default='gs://dataflow-samples/shakespeare/kinglear.txt',
+      help='Input file to process.')
+  parser.add_argument(
+      '--output', required=True, help='Output file to write results to.')
+  args, beam_args = parser.parse_known_args(sys.argv)
 
+  beam_options = PipelineOptions(beam_args)
+  with beam.Pipeline(options=beam_options) as pipeline:
     (
-        p
-        | beam.io.ReadFromText(my_options.input)
+        pipeline
+        | beam.io.ReadFromText(args.input)
         | beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
         | beam.Map(lambda x: (x, 1))
         | beam.combiners.Count.PerKey()
-        | beam.io.WriteToText(my_options.output))
+        | beam.io.WriteToText(args.output))
   # [END model_pipelines]
 
 
-def model_pcollection(argv):
+def model_pcollection(output_path):
   """Creating a PCollection from data in local memory."""
   # [START model_pcollection]
+  import sys
+
   import apache_beam as beam
   from apache_beam.options.pipeline_options import PipelineOptions
 
-  # argv = None  # if None, uses sys.argv
-  pipeline_options = PipelineOptions(argv)
-  with beam.Pipeline(options=pipeline_options) as pipeline:
+  beam_options = PipelineOptions(sys.argv)

Review comment:
       I've been trying to mock `argparse.ArgumentParser.parse_known_args` as well, but it's called multiple other times when building the PipelineOptions, so it's not working as expected. I still haven't found a good way to do this, if you have any other suggestions, please let me know.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 593457)
    Time Spent: 2h 40m  (was: 2.5h)

> Update pipeline options snippets for best practices
> ---------------------------------------------------
>
>                 Key: BEAM-10144
>                 URL: https://issues.apache.org/jira/browse/BEAM-10144
>             Project: Beam
>          Issue Type: Improvement
>          Components: examples-python
>            Reporter: David Cavazos
>            Priority: P3
>          Time Spent: 2h 40m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)