You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bh...@apache.org on 2021/10/15 20:39:36 UTC
[beam] branch master updated: [BEAM-11480] Use snippets for
DataFrame examples (#15600)
This is an automated email from the ASF dual-hosted git repository.
bhulette pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 83bd5d7 [BEAM-11480] Use snippets for DataFrame examples (#15600)
83bd5d7 is described below
commit 83bd5d7d943043dc070aa3e1d7d9f1b290196471
Author: Brian Hulette <bh...@google.com>
AuthorDate: Fri Oct 15 13:38:36 2021 -0700
[BEAM-11480] Use snippets for DataFrame examples (#15600)
* Use snippets for DataFrame examples
* add imports back
* remove unnecessary whitespace
---
.../apache_beam/examples/dataframe/taxiride.py | 4 +++
.../apache_beam/examples/dataframe/wordcount.py | 4 +++
.../en/documentation/dsls/dataframes/overview.md | 29 ++--------------------
3 files changed, 10 insertions(+), 27 deletions(-)
diff --git a/sdks/python/apache_beam/examples/dataframe/taxiride.py b/sdks/python/apache_beam/examples/dataframe/taxiride.py
index 5278235..391d371 100644
--- a/sdks/python/apache_beam/examples/dataframe/taxiride.py
+++ b/sdks/python/apache_beam/examples/dataframe/taxiride.py
@@ -34,12 +34,14 @@ ZONE_LOOKUP_PATH = (
def run_aggregation_pipeline(pipeline, input_path, output_path):
# The pipeline will be run on exiting the with block.
+ # [START DataFrame_taxiride_aggregation]
with pipeline as p:
rides = p | read_csv(input_path)
# Count the number of passengers dropped off per LocationID
agg = rides.groupby('DOLocationID').passenger_count.sum()
agg.to_csv(output_path)
+ # [END DataFrame_taxiride_aggregation]
def run_enrich_pipeline(
@@ -47,6 +49,7 @@ def run_enrich_pipeline(
"""Enrich taxi ride data with zone lookup table and perform a grouped
aggregation."""
# The pipeline will be run on exiting the with block.
+ # [START DataFrame_taxiride_enrich]
with pipeline as p:
rides = p | "Read taxi rides" >> read_csv(input_path)
zones = p | "Read zone lookup" >> read_csv(zone_lookup_path)
@@ -63,6 +66,7 @@ def run_enrich_pipeline(
# Sum passengers dropped off per Borough
agg = rides.groupby('Borough').passenger_count.sum()
agg.to_csv(output_path)
+ # [END DataFrame_taxiride_enrich]
# A more intuitive alternative to the above merge call, but this option
# doesn't preserve index, thus requires non-parallel execution.
diff --git a/sdks/python/apache_beam/examples/dataframe/wordcount.py b/sdks/python/apache_beam/examples/dataframe/wordcount.py
index 61efd0f..3e92417 100644
--- a/sdks/python/apache_beam/examples/dataframe/wordcount.py
+++ b/sdks/python/apache_beam/examples/dataframe/wordcount.py
@@ -51,6 +51,8 @@ def run(argv=None):
# The pipeline will be run on exiting the with block.
with beam.Pipeline(options=PipelineOptions(pipeline_args)) as p:
+ # [START DataFrame_wordcount]
+
# Read the text file[pattern] into a PCollection.
lines = p | 'Read' >> ReadFromText(known_args.input)
@@ -70,6 +72,8 @@ def run(argv=None):
# Deferred DataFrames can also be converted back to schema'd PCollections
counted_pc = to_pcollection(counted, include_indexes=True)
+ # [END DataFrame_wordcount]
+
# Print out every word that occurred >50 times
_ = (
counted_pc
diff --git a/website/www/site/content/en/documentation/dsls/dataframes/overview.md b/website/www/site/content/en/documentation/dsls/dataframes/overview.md
index c2c9f8f..9255f7f 100644
--- a/website/www/site/content/en/documentation/dsls/dataframes/overview.md
+++ b/website/www/site/content/en/documentation/dsls/dataframes/overview.md
@@ -45,11 +45,7 @@ You can use DataFrames as shown in the following example, which reads New York C
{{< highlight py >}}
from apache_beam.dataframe.io import read_csv
-
-with beam.Pipeline() as p:
- df = p | read_csv("gs://apache-beam-samples/nyc_taxi/misc/sample.csv")
- agg = df[['passenger_count', 'DOLocationID']].groupby('DOLocationID').sum()
- agg.to_csv('output')
+{{< code_sample "sdks/python/apache_beam/examples/dataframe/taxiride.py" DataFrame_taxiride_aggregation >}}
{{< /highlight >}}
pandas is able to infer column names from the first row of the CSV data, which is where `passenger_count` and `DOLocationID` come from.
@@ -64,32 +60,11 @@ To use the DataFrames API in a larger pipeline, you can convert a PCollection to
Here’s an example that creates a schema-aware PCollection, converts it to a DataFrame using [`to_dataframe`][pydoc_to_dataframe], processes the DataFrame, and then converts the DataFrame back to a PCollection using [`to_pcollection`][pydoc_to_pcollection]:
-<!-- TODO(BEAM-11480): Convert these examples to snippets -->
{{< highlight py >}}
from apache_beam.dataframe.convert import to_dataframe
from apache_beam.dataframe.convert import to_pcollection
...
- # Read the text file[pattern] into a PCollection.
- lines = p | 'Read' >> ReadFromText(known_args.input)
-
- words = (
- lines
- | 'Split' >> beam.FlatMap(
- lambda line: re.findall(r'[\w]+', line)).with_output_types(str)
- # Map to Row objects to generate a schema suitable for conversion
- # to a dataframe.
- | 'ToRows' >> beam.Map(lambda word: beam.Row(word=word)))
-
- df = to_dataframe(words)
- df['count'] = 1
- counted = df.groupby('word').sum()
- counted.to_csv(known_args.output)
-
- # Deferred DataFrames can also be converted back to schema'd PCollections
- counted_pc = to_pcollection(counted, include_indexes=True)
-
- # Do something with counted_pc
- ...
+{{< code_sample "sdks/python/apache_beam/examples/dataframe/wordcount.py" DataFrame_wordcount >}}
{{< /highlight >}}
You can find the full wordcount example on