You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/07/23 23:05:09 UTC

[GitHub] [beam] TheNeuralBit commented on a change in pull request #12355: [BEAM-10559] Add some comments and clean up SQL example.

TheNeuralBit commented on a change in pull request #12355:
URL: https://github.com/apache/beam/pull/12355#discussion_r459775292



##########
File path: sdks/python/apache_beam/examples/wordcount_xlang_sql.py
##########
@@ -31,51 +35,35 @@
 from apache_beam.io import WriteToText
 from apache_beam.options.pipeline_options import PipelineOptions
 from apache_beam.options.pipeline_options import SetupOptions
+from apache_beam.runners.portability import portable_runner
 from apache_beam.transforms.sql import SqlTransform
 
+# The input to SqlTransform must be a PCollection(s) of known schema.
+# One way to create such a PCollection is to produce a PCollection of
+# NamedTuple registered with the RowCoder.
+#
+# Here we create and register a simple NamedTuple with a single unicode typed
+# field named 'word' which we will use below.
 MyRow = typing.NamedTuple('MyRow', [('word', unicode)])
 coders.registry.register_coder(MyRow, coders.RowCoder)
 
-# Some more fun queries:
-# ------
-# SELECT
-#   word as key,
-#   COUNT(*) as `count`
-# FROM PCOLLECTION
-# GROUP BY word
-# ORDER BY `count` DESC
-# LIMIT 100
-# ------
-# SELECT
-#   len as key,
-#   COUNT(*) as `count`
-# FROM (
-#   SELECT
-#     LENGTH(word) AS len
-#   FROM PCOLLECTION
-# )
-# GROUP BY len
-
 
 def run(p, input_file, output_file):
   #pylint: disable=expression-not-assigned
   (
       p
-      | 'read' >> ReadFromText(input_file)
-      | 'split' >> beam.FlatMap(str.split)
-      | 'row' >> beam.Map(MyRow).with_output_types(MyRow)
-      | 'sql!!' >> SqlTransform(
+      | 'Read' >> ReadFromText(input_file)
+      | 'Split' >> beam.FlatMap(lambda line: re.split(r'\W+', line))
+      | 'ToRow' >> beam.Map(MyRow).with_output_types(MyRow)
+      | 'Sql!!' >> SqlTransform(
           """
                    SELECT
                      word as key,
                      COUNT(*) as `count`
                    FROM PCOLLECTION
                    GROUP BY word""")
-      | 'format' >> beam.Map(lambda row: '{}: {}'.format(row.key, row.count))
-      | 'write' >> WriteToText(output_file))
-
-  result = p.run()
-  result.wait_until_finish()
+      | 'Format' >> beam.Map(lambda row: '{}: {}'.format(row.key, row.count))

Review comment:
       SGTM :+1: 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org