You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bh...@apache.org on 2021/10/15 20:39:36 UTC

[beam] branch master updated: [BEAM-11480] Use snippets for DataFrame examples (#15600)

This is an automated email from the ASF dual-hosted git repository.

bhulette pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 83bd5d7  [BEAM-11480] Use snippets for DataFrame examples (#15600)
83bd5d7 is described below

commit 83bd5d7d943043dc070aa3e1d7d9f1b290196471
Author: Brian Hulette <bh...@google.com>
AuthorDate: Fri Oct 15 13:38:36 2021 -0700

    [BEAM-11480] Use snippets for DataFrame examples (#15600)
    
    * Use snippets for DataFrame examples
    
    * add imports back
    
    * remove unnecessary whitespace
---
 .../apache_beam/examples/dataframe/taxiride.py     |  4 +++
 .../apache_beam/examples/dataframe/wordcount.py    |  4 +++
 .../en/documentation/dsls/dataframes/overview.md   | 29 ++--------------------
 3 files changed, 10 insertions(+), 27 deletions(-)

diff --git a/sdks/python/apache_beam/examples/dataframe/taxiride.py b/sdks/python/apache_beam/examples/dataframe/taxiride.py
index 5278235..391d371 100644
--- a/sdks/python/apache_beam/examples/dataframe/taxiride.py
+++ b/sdks/python/apache_beam/examples/dataframe/taxiride.py
@@ -34,12 +34,14 @@ ZONE_LOOKUP_PATH = (
 
 def run_aggregation_pipeline(pipeline, input_path, output_path):
   # The pipeline will be run on exiting the with block.
+  # [START DataFrame_taxiride_aggregation]
   with pipeline as p:
     rides = p | read_csv(input_path)
 
     # Count the number of passengers dropped off per LocationID
     agg = rides.groupby('DOLocationID').passenger_count.sum()
     agg.to_csv(output_path)
+    # [END DataFrame_taxiride_aggregation]
 
 
 def run_enrich_pipeline(
@@ -47,6 +49,7 @@ def run_enrich_pipeline(
   """Enrich taxi ride data with zone lookup table and perform a grouped
   aggregation."""
   # The pipeline will be run on exiting the with block.
+  # [START DataFrame_taxiride_enrich]
   with pipeline as p:
     rides = p | "Read taxi rides" >> read_csv(input_path)
     zones = p | "Read zone lookup" >> read_csv(zone_lookup_path)
@@ -63,6 +66,7 @@ def run_enrich_pipeline(
     # Sum passengers dropped off per Borough
     agg = rides.groupby('Borough').passenger_count.sum()
     agg.to_csv(output_path)
+    # [END DataFrame_taxiride_enrich]
 
     # A more intuitive alternative to the above merge call, but this option
     # doesn't preserve index, thus requires non-parallel execution.
diff --git a/sdks/python/apache_beam/examples/dataframe/wordcount.py b/sdks/python/apache_beam/examples/dataframe/wordcount.py
index 61efd0f..3e92417 100644
--- a/sdks/python/apache_beam/examples/dataframe/wordcount.py
+++ b/sdks/python/apache_beam/examples/dataframe/wordcount.py
@@ -51,6 +51,8 @@ def run(argv=None):
   # The pipeline will be run on exiting the with block.
   with beam.Pipeline(options=PipelineOptions(pipeline_args)) as p:
 
+    # [START DataFrame_wordcount]
+
     # Read the text file[pattern] into a PCollection.
     lines = p | 'Read' >> ReadFromText(known_args.input)
 
@@ -70,6 +72,8 @@ def run(argv=None):
     # Deferred DataFrames can also be converted back to schema'd PCollections
     counted_pc = to_pcollection(counted, include_indexes=True)
 
+    # [END DataFrame_wordcount]
+
     # Print out every word that occurred >50 times
     _ = (
         counted_pc
diff --git a/website/www/site/content/en/documentation/dsls/dataframes/overview.md b/website/www/site/content/en/documentation/dsls/dataframes/overview.md
index c2c9f8f..9255f7f 100644
--- a/website/www/site/content/en/documentation/dsls/dataframes/overview.md
+++ b/website/www/site/content/en/documentation/dsls/dataframes/overview.md
@@ -45,11 +45,7 @@ You can use DataFrames as shown in the following example, which reads New York C
 
 {{< highlight py >}}
 from apache_beam.dataframe.io import read_csv
-
-with beam.Pipeline() as p:
-  df = p | read_csv("gs://apache-beam-samples/nyc_taxi/misc/sample.csv")
-  agg = df[['passenger_count', 'DOLocationID']].groupby('DOLocationID').sum()
-  agg.to_csv('output')
+{{< code_sample "sdks/python/apache_beam/examples/dataframe/taxiride.py" DataFrame_taxiride_aggregation >}}
 {{< /highlight >}}
 
 pandas is able to infer column names from the first row of the CSV data, which is where `passenger_count` and `DOLocationID` come from.
@@ -64,32 +60,11 @@ To use the DataFrames API in a larger pipeline, you can convert a PCollection to
 
 Here’s an example that creates a schema-aware PCollection, converts it to a DataFrame using [`to_dataframe`][pydoc_to_dataframe], processes the DataFrame, and then converts the DataFrame back to a PCollection using [`to_pcollection`][pydoc_to_pcollection]:
 
-<!-- TODO(BEAM-11480): Convert these examples to snippets -->
 {{< highlight py >}}
 from apache_beam.dataframe.convert import to_dataframe
 from apache_beam.dataframe.convert import to_pcollection
 ...
-    # Read the text file[pattern] into a PCollection.
-    lines = p | 'Read' >> ReadFromText(known_args.input)
-
-    words = (
-        lines
-        | 'Split' >> beam.FlatMap(
-            lambda line: re.findall(r'[\w]+', line)).with_output_types(str)
-        # Map to Row objects to generate a schema suitable for conversion
-        # to a dataframe.
-        | 'ToRows' >> beam.Map(lambda word: beam.Row(word=word)))
-
-    df = to_dataframe(words)
-    df['count'] = 1
-    counted = df.groupby('word').sum()
-    counted.to_csv(known_args.output)
-
-    # Deferred DataFrames can also be converted back to schema'd PCollections
-    counted_pc = to_pcollection(counted, include_indexes=True)
-
-    # Do something with counted_pc
-    ...
+{{< code_sample "sdks/python/apache_beam/examples/dataframe/wordcount.py" DataFrame_wordcount >}}
 {{< /highlight >}}
 
 You can find the full wordcount example on