You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/07/23 22:48:18 UTC

[GitHub] [beam] robertwb commented on a change in pull request #12355: [BEAM-10559] Add some comments and clean up SQL example.

robertwb commented on a change in pull request #12355:
URL: https://github.com/apache/beam/pull/12355#discussion_r459765700



##########
File path: sdks/python/apache_beam/examples/wordcount_xlang_sql.py
##########
@@ -15,12 +15,16 @@
 # limitations under the License.
 #
 
-"""A cross-language word-counting workflow."""
+"""A cross-language word-counting workflow.
+

Review comment:
       Wordcount is the quintessential example of all Beam pipelines, and this does exactly the same thing that all the other wordcount variants in this folder do. I think it makes more sense to highlight the differences. 

##########
File path: sdks/python/apache_beam/examples/wordcount_xlang_sql.py
##########
@@ -31,51 +35,35 @@
 from apache_beam.io import WriteToText
 from apache_beam.options.pipeline_options import PipelineOptions
 from apache_beam.options.pipeline_options import SetupOptions
+from apache_beam.runners.portability import portable_runner
 from apache_beam.transforms.sql import SqlTransform
 
+# The input to SqlTransform must be a PCollection(s) of known schema.
+# One way to create such a PCollection is to produce a PCollection of
+# NamedTuple registered with the RowCoder.

Review comment:
       There is the recently added `Row` type. The docs on SqlTransform should be updated, but that's not a prerequisite for using these transforms. 

##########
File path: sdks/python/apache_beam/examples/wordcount_xlang_sql.py
##########
@@ -31,51 +35,35 @@
 from apache_beam.io import WriteToText
 from apache_beam.options.pipeline_options import PipelineOptions
 from apache_beam.options.pipeline_options import SetupOptions
+from apache_beam.runners.portability import portable_runner
 from apache_beam.transforms.sql import SqlTransform
 
+# The input to SqlTransform must be a PCollection(s) of known schema.
+# One way to create such a PCollection is to produce a PCollection of
+# NamedTuple registered with the RowCoder.
+#
+# Here we create and register a simple NamedTuple with a single unicode typed
+# field named 'word' which we will use below.
 MyRow = typing.NamedTuple('MyRow', [('word', unicode)])
 coders.registry.register_coder(MyRow, coders.RowCoder)
 
-# Some more fun queries:
-# ------
-# SELECT
-#   word as key,
-#   COUNT(*) as `count`
-# FROM PCOLLECTION
-# GROUP BY word
-# ORDER BY `count` DESC
-# LIMIT 100
-# ------
-# SELECT
-#   len as key,
-#   COUNT(*) as `count`
-# FROM (
-#   SELECT
-#     LENGTH(word) AS len
-#   FROM PCOLLECTION
-# )
-# GROUP BY len
-
 
 def run(p, input_file, output_file):
   #pylint: disable=expression-not-assigned
   (
       p
-      | 'read' >> ReadFromText(input_file)
-      | 'split' >> beam.FlatMap(str.split)
-      | 'row' >> beam.Map(MyRow).with_output_types(MyRow)
-      | 'sql!!' >> SqlTransform(
+      | 'Read' >> ReadFromText(input_file)
+      | 'Split' >> beam.FlatMap(lambda line: re.split(r'\W+', line))
+      | 'ToRow' >> beam.Map(MyRow).with_output_types(MyRow)
+      | 'Sql!!' >> SqlTransform(
           """
                    SELECT
                      word as key,
                      COUNT(*) as `count`
                    FROM PCOLLECTION
                    GROUP BY word""")
-      | 'format' >> beam.Map(lambda row: '{}: {}'.format(row.key, row.count))
-      | 'write' >> WriteToText(output_file))
-
-  result = p.run()
-  result.wait_until_finish()
+      | 'Format' >> beam.Map(lambda row: '{}: {}'.format(row.key, row.count))

Review comment:
       Maybe just 
   
   SqlTransform yields a PCollection containing elements with attributes based on the output of the query.
   
   The fact that it's a named tuple is not needed for its use (and I'm not sure we want to promise that). 

##########
File path: sdks/python/apache_beam/examples/wordcount_xlang_sql.py
##########
@@ -101,12 +89,33 @@ def main():
   # workflow rely on global context (e.g., a module imported at module level).
   pipeline_options.view_as(SetupOptions).save_main_session = True
 
-  p = beam.Pipeline(options=pipeline_options)
-  # Preemptively start due to BEAM-6666.
-  p.runner.create_job_service(pipeline_options)
+  with beam.Pipeline(options=pipeline_options) as p:
+    if isinstance(p.runner, portable_runner.PortableRunner):

Review comment:
       This will just work for Dataflow. That's why I added the guard. 

##########
File path: sdks/python/apache_beam/examples/wordcount_xlang_sql.py
##########
@@ -15,12 +15,16 @@
 # limitations under the License.
 #
 
-"""A cross-language word-counting workflow."""
+"""A cross-language word-counting workflow.
+
+Java and docker must be available to run this pipeline.

Review comment:
       I don't know what such a readme would say. You run the pipeline with `python wordcount_xlang_sql.py` or `python -m apache_beam.examples.wordcount_xlang_sql` just as with all the other examples here. For Dataflow you set `--runner=DataflowRunner` (plus all the other parameters as documented elsewhere, but I did add a note about the experiment), for Flink you set `--runner=FlinkRunner`. Unlike kafka, there's no need for special instructions about setting up or connecting to an external cluster. Pretty much anything I can think of putting in a readme for this pipeline would be equally as well placed in a readme for running pipelines in general. 
   
   The best readme is a readme that is not needed because it just works. 
   
   (I do think it'd be good to have more docs and other more comprehensive examples, e.g. using `Row` and joins and more complicated queries.)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org