You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2017/05/03 01:27:47 UTC

[1/3] beam-site git commit: Add Python snippets to Mobile Gaming.

Repository: beam-site
Updated Branches:
  refs/heads/asf-site 496f24014 -> 3cafa86a0


Add Python snippets to Mobile Gaming.


Project: http://git-wip-us.apache.org/repos/asf/beam-site/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam-site/commit/bf62dc99
Tree: http://git-wip-us.apache.org/repos/asf/beam-site/tree/bf62dc99
Diff: http://git-wip-us.apache.org/repos/asf/beam-site/diff/bf62dc99

Branch: refs/heads/asf-site
Commit: bf62dc99069178c440b867a69042b05dda0cc439
Parents: 496f240
Author: Hadar Hod <ha...@google.com>
Authored: Fri Apr 21 09:36:57 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Tue May 2 18:23:32 2017 -0700

----------------------------------------------------------------------
 src/get-started/mobile-gaming-example.md | 155 +++++++++++++++++++++++++-
 1 file changed, 153 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam-site/blob/bf62dc99/src/get-started/mobile-gaming-example.md
----------------------------------------------------------------------
diff --git a/src/get-started/mobile-gaming-example.md b/src/get-started/mobile-gaming-example.md
index 7519d3b..5c97274 100644
--- a/src/get-started/mobile-gaming-example.md
+++ b/src/get-started/mobile-gaming-example.md
@@ -10,9 +10,21 @@ redirect_from: /use/mobile-gaming-example/
 * TOC
 {:toc}
 
+<nav class="language-switcher">
+  <strong>Adapt for:</strong> 
+  <ul>
+    <li data-type="language-java">Java SDK</li>
+    <li data-type="language-py">Python SDK</li>
+  </ul>
+</nav>
+
 This section provides a walkthrough of a series of example Apache Beam pipelines that demonstrate more complex functionality than the basic [WordCount]({{ site.baseurl }}/get-started/wordcount-example) examples. The pipelines in this section process data from a hypothetical game that users play on their mobile phones. The pipelines demonstrate processing at increasing levels of complexity; the first pipeline, for example, shows how to run a batch analysis job to obtain relatively simple score data, while the later pipelines use Beam's windowing and triggers features to provide low-latency data analysis and more complex intelligence about user's play patterns.
 
-> **Note**: These examples assume some familiarity with the Beam programming model. If you haven't already, we recommend familiarizing yourself with the programming model documentation and running a basic example pipeline before continuing. Note also that these examples use the Java 8 lambda syntax, and thus require Java 8. However, you can create pipelines with equivalent functionality using Java 7.
+{:.language-java}
+> **Note**: These examples assume some familiarity with the Beam programming model. If you haven't already, we recommend familiarizing yourself with the programming model documentation and running a basic example pipeline before continuing. Note also that these examples use the Java 8 lambda syntax, and thus require Java 8. However, you can create pipelines with equivalent functionality using Java 7. 
+
+{:.language-py}
+> **Note**: These examples assume some familiarity with the Beam programming model. If you haven't already, we recommend familiarizing yourself with the programming model documentation and running a basic example pipeline before continuing.
 
 Every time a user plays an instance of our hypothetical mobile game, they generate a data event. Each data event consists of the following information:
 
@@ -44,8 +56,12 @@ The Mobile Game example pipelines vary in complexity, from simple batch analysis
 
 The `UserScore` pipeline is the simplest example for processing mobile game data. `UserScore` determines the total score per user over a finite data set (for example, one day's worth of scores stored on the game server). Pipelines like `UserScore` are best run periodically after all relevant data has been gathered. For example, `UserScore` could run as a nightly job over data gathered during that day.
 
+{:.language-java}
 > **Note:** See [UserScore on GitHub](https://github.com/apache/beam/blob/master/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java) for the complete example pipeline program.
 
+{:.language-py}
+> **Note:** See [UserScore on GitHub](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/complete/game/user_score.py) for the complete example pipeline program.
+
 ### What Does UserScore Do?
 
 In a day's worth of scoring data, each user ID may have multiple records (if the user plays more than one instance of the game during the analysis window), each with their own score value and timestamp. If we want to determine the total score over all the instances a user plays during the day, our pipeline will need to group all the records together per individual user.
@@ -99,6 +115,28 @@ public static class ExtractAndSumScore
 }
 ```
 
+```py
+class ExtractAndSumScore(beam.PTransform):
+  """A transform to extract key/score information and sum the scores.
+  The constructor argument `field` determines whether 'team' or 'user' info is
+  extracted.
+  """
+  def __init__(self, field):
+    super(ExtractAndSumScore, self).__init__()
+    self.field = field
+
+  def expand(self, pcoll):
+    return (pcoll
+            | beam.Map(lambda info: (info[self.field], info['score']))
+            | beam.CombinePerKey(sum_ints))
+
+def configure_bigquery_write():
+  return [
+      ('user', 'STRING', lambda e: e[0]),
+      ('total_score', 'INTEGER', lambda e: e[1]),
+  ]
+```
+
 `ExtractAndSumScore` is written to be more general, in that you can pass in the field by which you want to group the data (in the case of our game, by unique user or unique team). This means we can re-use `ExtractAndSumScore` in other pipelines that group score data by team, for example.
 
 Here's the main method of `UserScore`, showing how we apply all three steps of the pipeline:
@@ -123,6 +161,25 @@ public static void main(String[] args) throws Exception {
 }
 ```
 
+```py
+def run(argv=None):
+  """Main entry point; defines and runs the user_score pipeline."""
+  
+  ...
+
+  pipeline_options = PipelineOptions(pipeline_args)
+  p = beam.Pipeline(options=pipeline_options)
+
+  (p  # pylint: disable=expression-not-assigned
+   | ReadFromText(known_args.input) # Read events from a file and parse them.
+   | UserScore()
+   | WriteToBigQuery(
+       known_args.table_name, known_args.dataset, configure_bigquery_write()))
+
+  result = p.run()
+  result.wait_until_finish()
+```
+
 ### Working with the Results
 
 `UserScore` writes the data to a BigQuery table (called `user_score` by default). With the data in the BigQuery table, we might perform a further interactive analysis, such as querying for a list of the N top-scoring users for a given day.
@@ -153,8 +210,12 @@ The `HourlyTeamScore` pipeline expands on the basic batch analysis principles us
 
 Like `UserScore`, `HourlyTeamScore` is best thought of as a job to be run periodically after all the relevant data has been gathered (such as once per day). The pipeline reads a fixed data set from a file, and writes the results to a Google Cloud BigQuery table, just like `UserScore`.
 
+{:.language-java}
 > **Note:** See [HourlyTeamScore on GitHub](https://github.com/apache/beam/blob/master/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java) for the complete example pipeline program.
 
+{:.language-py}
+> **Note:** See [HourlyTeamScore on GitHub](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/complete/game/hourly_team_score.py) for the complete example pipeline program.
+
 ### What Does HourlyTeamScore Do?
 
 `HourlyTeamScore` calculates the total score per team, per hour, in a fixed data set (such as one day's worth of data).
@@ -184,7 +245,13 @@ Notice that as processing time advances, the sums are now _per window_; each win
 
 Beam's windowing feature uses the [intrinsic timestamp information]({{ site.baseurl }}/documentation/programming-guide/#pctimestamps) attached to each element of a `PCollection`. Because we want our pipeline to window based on _event time_, we **must first extract the timestamp** that's embedded in each data record apply it to the corresponding element in the `PCollection` of score data. Then, the pipeline can **apply the windowing function** to divide the `PCollection` into logical windows.
 
-Here's the code, which shows how `HourlyTeamScore` uses the [WithTimestamps](https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java) and [Window](https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java) transforms to perform these operations:
+{:.language-java}
+`HourlyTeamScore` uses the [WithTimestamps](https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java) and [Window](https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java) transforms to perform these operations.
+
+{:.language-py}
+`HourlyTeamScore` uses the `FixedWindows` transform, found in [window.py](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/window.py), to perform these operations.
+
+The following code shows this: 
 
 ```java
 // Add an element timestamp based on the event log, and apply fixed windowing.
@@ -194,6 +261,17 @@ Here's the code, which shows how `HourlyTeamScore` uses the [WithTimestamps](htt
         FixedWindows.of(Duration.standardMinutes(options.getWindowDuration()))))
 ```
 
+```py
+# Add an element timestamp based on the event log, and apply fixed windowing.
+# Convert element['timestamp'] into seconds as expected by TimestampedValue.
+| 'AddEventTimestamps' >> beam.Map(
+    lambda element: TimestampedValue(
+        element, element['timestamp'] / 1000.0))
+# Convert window_duration into seconds as expected by FixedWindows.
+| 'FixedWindowsTeam' >> beam.WindowInto(FixedWindows(
+    size=self.window_duration * 60))
+```
+
 Notice that the transforms the pipeline uses to specify the windowing are distinct from the actual data processing transforms (such as `ExtractAndSumScores`). This functionality provides you some flexibility in designing your Beam pipeline, in that you can run existing transforms over datasets with different windowing characteristics.
 
 #### Filtering Based On Event Time
@@ -215,6 +293,13 @@ The following code shows how `HourlyTeamScore` uses the `Filter` transform to fi
         -> gInfo.getTimestamp() < stopMinTimestamp.getMillis()))
 ```
 
+```py
+| 'FilterStartTime' >> beam.Filter(
+    lambda element: element['timestamp'] > start_min_filter)
+| 'FilterEndTime' >> beam.Filter(
+    lambda element: element['timestamp'] < end_min_filter)
+```
+
 #### Calculating Score Per Team, Per Window
 
 `HourlyTeamScore` uses the same `ExtractAndSumScores` transform as the `UserScore` pipeline, but passes a different key (team, as opposed to user). Also, because the pipeline applies `ExtractAndSumScores` _after_ applying fixed-time 1-hour windowing to the input data, the data gets grouped by both team _and_ window. You can see the full sequence of transforms in `HourlyTeamScore`'s main method:
@@ -266,6 +351,68 @@ public static void main(String[] args) throws Exception {
 }
 ```
 
+```py
+class HourlyTeamScore(beam.PTransform):
+  def __init__(self, start_min, stop_min, window_duration):
+    super(HourlyTeamScore, self).__init__()
+    self.start_min = start_min
+    self.stop_min = stop_min
+    self.window_duration = window_duration
+
+  def expand(self, pcoll):
+    start_min_filter = string_to_timestamp(self.start_min)
+    end_min_filter = string_to_timestamp(self.stop_min)
+
+    return (
+        pcoll
+        | 'ParseGameEvent' >> beam.ParDo(ParseEventFn())
+        # Filter out data before and after the given times so that it is not
+        # included in the calculations. As we collect data in batches (say, by
+        # day), the batch for the day that we want to analyze could potentially
+        # include some late-arriving data from the previous day. If so, we want
+        # to weed it out. Similarly, if we include data from the following day
+        # (to scoop up late-arriving events from the day we're analyzing), we
+        # need to weed out events that fall after the time period we want to
+        # analyze.
+        | 'FilterStartTime' >> beam.Filter(
+            lambda element: element['timestamp'] > start_min_filter)
+        | 'FilterEndTime' >> beam.Filter(
+            lambda element: element['timestamp'] < end_min_filter)
+        # Add an element timestamp based on the event log, and apply fixed
+        # windowing.
+        # Convert element['timestamp'] into seconds as expected by
+        # TimestampedValue.
+        | 'AddEventTimestamps' >> beam.Map(
+            lambda element: TimestampedValue(
+                element, element['timestamp'] / 1000.0))
+        # Convert window_duration into seconds as expected by FixedWindows.
+        | 'FixedWindowsTeam' >> beam.WindowInto(FixedWindows(
+            size=self.window_duration * 60))
+        # Extract and sum teamname/score pairs from the event data.
+        | 'ExtractTeamScore' >> ExtractAndSumScore('team'))
+
+
+def run(argv=None):
+  """Main entry point; defines and runs the hourly_team_score pipeline."""
+  ...
+
+  known_args, pipeline_args = parser.parse_known_args(argv)
+
+  pipeline_options = PipelineOptions(pipeline_args)
+  p = beam.Pipeline(options=pipeline_options)
+  pipeline_options.view_as(SetupOptions).save_main_session = True
+
+  (p  # pylint: disable=expression-not-assigned
+   | ReadFromText(known_args.input)
+   | HourlyTeamScore(
+       known_args.start_min, known_args.stop_min, known_args.window_duration)
+   | WriteWindowedToBigQuery(
+       known_args.table_name, known_args.dataset, configure_bigquery_write()))
+
+  result = p.run()
+  result.wait_until_finish()
+```
+
 ### Limitations
 
 As written, `HourlyTeamScore` still has a limitation:
@@ -275,6 +422,8 @@ As written, `HourlyTeamScore` still has a limitation:
 
 ## LeaderBoard: Streaming Processing with Real-Time Game Data
 
+> **Note:** This example currently exists in Java only.
+
 One way we can help address the latency issue present in the `UserScore` and `HourlyTeamScore` pipelines is by reading the score data from an unbounded source. The `LeaderBoard` pipeline introduces streaming processing by reading the game score data from an unbounded source that produces an infinite amount of data, rather than from a file on the game server.
 
 The `LeaderBoard` pipeline also demonstrates how to process game score data with respect to both _processing time_ and _event time_. `LeaderBoard` outputs data about both individual user scores and about team scores, each with respect to a different time frame.
@@ -405,6 +554,8 @@ Taken together, these processing strategies let us address the latency and compl
 
 ## GameStats: Abuse Detection and Usage Analysis
 
+> **Note:** This example currently exists in Java only.
+
 While `LeaderBoard` demonstrates how to use basic windowing and triggers to perform low-latency and flexible data analysis, we can use more advanced windowing techniques to perform more comprehensive analysis. This might include some calculations designed to detect system abuse (like spam) or to gain insight into user behavior. The `GameStats` pipeline builds on the low-latency functionality in `LeaderBoard` to demonstrate how you can use Beam to perform this kind of advanced analysis.
 
 Like `LeaderBoard`, `GameStats` reads data from an unbounded source. It is best thought of as an ongoing job that provides insight into the game as users play.


[3/3] beam-site git commit: This closes #220

Posted by al...@apache.org.
This closes #220


Project: http://git-wip-us.apache.org/repos/asf/beam-site/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam-site/commit/3cafa86a
Tree: http://git-wip-us.apache.org/repos/asf/beam-site/tree/3cafa86a
Diff: http://git-wip-us.apache.org/repos/asf/beam-site/diff/3cafa86a

Branch: refs/heads/asf-site
Commit: 3cafa86a038320ad56c7ed04c4b1be2d04f606f7
Parents: 496f240 b76eb56
Author: Ahmet Altay <al...@google.com>
Authored: Tue May 2 18:26:46 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Tue May 2 18:26:46 2017 -0700

----------------------------------------------------------------------
 .../mobile-gaming-example/index.html            | 161 ++++++++++++++++++-
 src/get-started/mobile-gaming-example.md        | 155 +++++++++++++++++-
 2 files changed, 310 insertions(+), 6 deletions(-)
----------------------------------------------------------------------



[2/3] beam-site git commit: Regenerate website

Posted by al...@apache.org.
Regenerate website


Project: http://git-wip-us.apache.org/repos/asf/beam-site/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam-site/commit/b76eb56b
Tree: http://git-wip-us.apache.org/repos/asf/beam-site/tree/b76eb56b
Diff: http://git-wip-us.apache.org/repos/asf/beam-site/diff/b76eb56b

Branch: refs/heads/asf-site
Commit: b76eb56b6986f99aa73ce38f8e719803ab18fb4e
Parents: bf62dc9
Author: Ahmet Altay <al...@google.com>
Authored: Tue May 2 18:26:45 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Tue May 2 18:26:45 2017 -0700

----------------------------------------------------------------------
 .../mobile-gaming-example/index.html            | 161 ++++++++++++++++++-
 1 file changed, 157 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam-site/blob/b76eb56b/content/get-started/mobile-gaming-example/index.html
----------------------------------------------------------------------
diff --git a/content/get-started/mobile-gaming-example/index.html b/content/get-started/mobile-gaming-example/index.html
index 054024c..7a3fbb4 100644
--- a/content/get-started/mobile-gaming-example/index.html
+++ b/content/get-started/mobile-gaming-example/index.html
@@ -188,12 +188,24 @@
   </li>
 </ul>
 
+<nav class="language-switcher">
+  <strong>Adapt for:</strong> 
+  <ul>
+    <li data-type="language-java">Java SDK</li>
+    <li data-type="language-py">Python SDK</li>
+  </ul>
+</nav>
+
 <p>This section provides a walkthrough of a series of example Apache Beam pipelines that demonstrate more complex functionality than the basic <a href="/get-started/wordcount-example">WordCount</a> examples. The pipelines in this section process data from a hypothetical game that users play on their mobile phones. The pipelines demonstrate processing at increasing levels of complexity; the first pipeline, for example, shows how to run a batch analysis job to obtain relatively simple score data, while the later pipelines use Beam’s windowing and triggers features to provide low-latency data analysis and more complex intelligence about user’s play patterns.</p>
 
-<blockquote>
+<blockquote class="language-java">
   <p><strong>Note</strong>: These examples assume some familiarity with the Beam programming model. If you haven’t already, we recommend familiarizing yourself with the programming model documentation and running a basic example pipeline before continuing. Note also that these examples use the Java 8 lambda syntax, and thus require Java 8. However, you can create pipelines with equivalent functionality using Java 7.</p>
 </blockquote>
 
+<blockquote class="language-py">
+  <p><strong>Note</strong>: These examples assume some familiarity with the Beam programming model. If you haven’t already, we recommend familiarizing yourself with the programming model documentation and running a basic example pipeline before continuing.</p>
+</blockquote>
+
 <p>Every time a user plays an instance of our hypothetical mobile game, they generate a data event. Each data event consists of the following information:</p>
 
 <ul>
@@ -224,10 +236,14 @@
 
 <p>The <code class="highlighter-rouge">UserScore</code> pipeline is the simplest example for processing mobile game data. <code class="highlighter-rouge">UserScore</code> determines the total score per user over a finite data set (for example, one day’s worth of scores stored on the game server). Pipelines like <code class="highlighter-rouge">UserScore</code> are best run periodically after all relevant data has been gathered. For example, <code class="highlighter-rouge">UserScore</code> could run as a nightly job over data gathered during that day.</p>
 
-<blockquote>
+<blockquote class="language-java">
   <p><strong>Note:</strong> See <a href="https://github.com/apache/beam/blob/master/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java">UserScore on GitHub</a> for the complete example pipeline program.</p>
 </blockquote>
 
+<blockquote class="language-py">
+  <p><strong>Note:</strong> See <a href="https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/complete/game/user_score.py">UserScore on GitHub</a> for the complete example pipeline program.</p>
+</blockquote>
+
 <h3 id="what-does-userscore-do">What Does UserScore Do?</h3>
 
 <p>In a day’s worth of scoring data, each user ID may have multiple records (if the user plays more than one instance of the game during the analysis window), each with their own score value and timestamp. If we want to determine the total score over all the instances a user plays during the day, our pipeline will need to group all the records together per individual user.</p>
@@ -283,6 +299,28 @@
 </code></pre>
 </div>
 
+<div class="language-py highlighter-rouge"><pre class="highlight"><code><span class="k">class</span> <span class="nc">ExtractAndSumScore</span><span class="p">(</span><span class="n">beam</span><span class="o">.</span><span class="n">PTransform</span><span class="p">):</span>
+  <span class="s">"""A transform to extract key/score information and sum the scores.
+  The constructor argument `field` determines whether 'team' or 'user' info is
+  extracted.
+  """</span>
+  <span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">field</span><span class="p">):</span>
+    <span class="nb">super</span><span class="p">(</span><span class="n">ExtractAndSumScore</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="n">__init__</span><span class="p">()</span>
+    <span class="bp">self</span><span class="o">.</span><span class="n">field</span> <span class="o">=</span> <span class="n">field</span>
+
+  <span class="k">def</span> <span class="nf">expand</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">pcoll</span><span class="p">):</span>
+    <span class="k">return</span> <span class="p">(</span><span class="n">pcoll</span>
+            <span class="o">|</span> <span class="n">beam</span><span class="o">.</span><span class="n">Map</span><span class="p">(</span><span class="k">lambda</span> <span class="n">info</span><span class="p">:</span> <span class="p">(</span><span class="n">info</span><span class="p">[</span><span class="bp">self</span><span class="o">.</span><span class="n">field</span><span class="p">],</span> <span class="n">info</span><span class="p">[</span><span class="s">'score'</span><span class="p">]))</span>
+            <span class="o">|</span> <span class="n">beam</span><span class="o">.</span><span class="n">CombinePerKey</span><span class="p">(</span><span class="n">sum_ints</span><span class="p">))</span>
+
+<span class="k">def</span> <span class="nf">configure_bigquery_write</span><span class="p">():</span>
+  <span class="k">return</span> <span class="p">[</span>
+      <span class="p">(</span><span class="s">'user'</span><span class="p">,</span> <span class="s">'STRING'</span><span class="p">,</span> <span class="k">lambda</span> <span class="n">e</span><span class="p">:</span> <span class="n">e</span><span class="p">[</span><span class="mi">0</span><span class="p">]),</span>
+      <span class="p">(</span><span class="s">'total_score'</span><span class="p">,</span> <span class="s">'INTEGER'</span><span class="p">,</span> <span class="k">lambda</span> <span class="n">e</span><span class="p">:</span> <span class="n">e</span><span class="p">[</span><span class="mi">1</span><span class="p">]),</span>
+  <span class="p">]</span>
+</code></pre>
+</div>
+
 <p><code class="highlighter-rouge">ExtractAndSumScore</code> is written to be more general, in that you can pass in the field by which you want to group the data (in the case of our game, by unique user or unique team). This means we can re-use <code class="highlighter-rouge">ExtractAndSumScore</code> in other pipelines that group score data by team, for example.</p>
 
 <p>Here’s the main method of <code class="highlighter-rouge">UserScore</code>, showing how we apply all three steps of the pipeline:</p>
@@ -307,6 +345,25 @@
 </code></pre>
 </div>
 
+<div class="language-py highlighter-rouge"><pre class="highlight"><code><span class="k">def</span> <span class="nf">run</span><span class="p">(</span><span class="n">argv</span><span class="o">=</span><span class="bp">None</span><span class="p">):</span>
+  <span class="s">"""Main entry point; defines and runs the user_score pipeline."""</span>
+  
+  <span class="o">...</span>
+
+  <span class="n">pipeline_options</span> <span class="o">=</span> <span class="n">PipelineOptions</span><span class="p">(</span><span class="n">pipeline_args</span><span class="p">)</span>
+  <span class="n">p</span> <span class="o">=</span> <span class="n">beam</span><span class="o">.</span><span class="n">Pipeline</span><span class="p">(</span><span class="n">options</span><span class="o">=</span><span class="n">pipeline_options</span><span class="p">)</span>
+
+  <span class="p">(</span><span class="n">p</span>  <span class="c"># pylint: disable=expression-not-assigned</span>
+   <span class="o">|</span> <span class="n">ReadFromText</span><span class="p">(</span><span class="n">known_args</span><span class="o">.</span><span class="nb">input</span><span class="p">)</span> <span class="c"># Read events from a file and parse them.</span>
+   <span class="o">|</span> <span class="n">UserScore</span><span class="p">()</span>
+   <span class="o">|</span> <span class="n">WriteToBigQuery</span><span class="p">(</span>
+       <span class="n">known_args</span><span class="o">.</span><span class="n">table_name</span><span class="p">,</span> <span class="n">known_args</span><span class="o">.</span><span class="n">dataset</span><span class="p">,</span> <span class="n">configure_bigquery_write</span><span class="p">()))</span>
+
+  <span class="n">result</span> <span class="o">=</span> <span class="n">p</span><span class="o">.</span><span class="n">run</span><span class="p">()</span>
+  <span class="n">result</span><span class="o">.</span><span class="n">wait_until_finish</span><span class="p">()</span>
+</code></pre>
+</div>
+
 <h3 id="working-with-the-results">Working with the Results</h3>
 
 <p><code class="highlighter-rouge">UserScore</code> writes the data to a BigQuery table (called <code class="highlighter-rouge">user_score</code> by default). With the data in the BigQuery table, we might perform a further interactive analysis, such as querying for a list of the N top-scoring users for a given day.</p>
@@ -344,10 +401,14 @@
 
 <p>Like <code class="highlighter-rouge">UserScore</code>, <code class="highlighter-rouge">HourlyTeamScore</code> is best thought of as a job to be run periodically after all the relevant data has been gathered (such as once per day). The pipeline reads a fixed data set from a file, and writes the results to a Google Cloud BigQuery table, just like <code class="highlighter-rouge">UserScore</code>.</p>
 
-<blockquote>
+<blockquote class="language-java">
   <p><strong>Note:</strong> See <a href="https://github.com/apache/beam/blob/master/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java">HourlyTeamScore on GitHub</a> for the complete example pipeline program.</p>
 </blockquote>
 
+<blockquote class="language-py">
+  <p><strong>Note:</strong> See <a href="https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/complete/game/hourly_team_score.py">HourlyTeamScore on GitHub</a> for the complete example pipeline program.</p>
+</blockquote>
+
 <h3 id="what-does-hourlyteamscore-do">What Does HourlyTeamScore Do?</h3>
 
 <p><code class="highlighter-rouge">HourlyTeamScore</code> calculates the total score per team, per hour, in a fixed data set (such as one day’s worth of data).</p>
@@ -382,7 +443,11 @@
 
 <p>Beam’s windowing feature uses the <a href="/documentation/programming-guide/#pctimestamps">intrinsic timestamp information</a> attached to each element of a <code class="highlighter-rouge">PCollection</code>. Because we want our pipeline to window based on <em>event time</em>, we <strong>must first extract the timestamp</strong> that’s embedded in each data record apply it to the corresponding element in the <code class="highlighter-rouge">PCollection</code> of score data. Then, the pipeline can <strong>apply the windowing function</strong> to divide the <code class="highlighter-rouge">PCollection</code> into logical windows.</p>
 
-<p>Here’s the code, which shows how <code class="highlighter-rouge">HourlyTeamScore</code> uses the <a href="https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java">WithTimestamps</a> and <a href="https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java">Window</a> transforms to perform these operations:</p>
+<p class="language-java"><code class="highlighter-rouge">HourlyTeamScore</code> uses the <a href="https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java">WithTimestamps</a> and <a href="https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java">Window</a> transforms to perform these operations.</p>
+
+<p class="language-py"><code class="highlighter-rouge">HourlyTeamScore</code> uses the <code class="highlighter-rouge">FixedWindows</code> transform, found in <a href="https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/window.py">window.py</a>, to perform these operations.</p>
+
+<p>The following code shows this:</p>
 
 <div class="language-java highlighter-rouge"><pre class="highlight"><code><span class="c1">// Add an element timestamp based on the event log, and apply fixed windowing.</span>
     <span class="o">.</span><span class="na">apply</span><span class="o">(</span><span class="s">"AddEventTimestamps"</span><span class="o">,</span>
@@ -392,6 +457,17 @@
 </code></pre>
 </div>
 
+<div class="language-py highlighter-rouge"><pre class="highlight"><code><span class="c"># Add an element timestamp based on the event log, and apply fixed windowing.</span>
+<span class="c"># Convert element['timestamp'] into seconds as expected by TimestampedValue.</span>
+<span class="o">|</span> <span class="s">'AddEventTimestamps'</span> <span class="o">&gt;&gt;</span> <span class="n">beam</span><span class="o">.</span><span class="n">Map</span><span class="p">(</span>
+    <span class="k">lambda</span> <span class="n">element</span><span class="p">:</span> <span class="n">TimestampedValue</span><span class="p">(</span>
+        <span class="n">element</span><span class="p">,</span> <span class="n">element</span><span class="p">[</span><span class="s">'timestamp'</span><span class="p">]</span> <span class="o">/</span> <span class="mf">1000.0</span><span class="p">))</span>
+<span class="c"># Convert window_duration into seconds as expected by FixedWindows.</span>
+<span class="o">|</span> <span class="s">'FixedWindowsTeam'</span> <span class="o">&gt;&gt;</span> <span class="n">beam</span><span class="o">.</span><span class="n">WindowInto</span><span class="p">(</span><span class="n">FixedWindows</span><span class="p">(</span>
+    <span class="n">size</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">window_duration</span> <span class="o">*</span> <span class="mi">60</span><span class="p">))</span>
+</code></pre>
+</div>
+
 <p>Notice that the transforms the pipeline uses to specify the windowing are distinct from the actual data processing transforms (such as <code class="highlighter-rouge">ExtractAndSumScores</code>). This functionality provides you some flexibility in designing your Beam pipeline, in that you can run existing transforms over datasets with different windowing characteristics.</p>
 
 <h4 id="filtering-based-on-event-time">Filtering Based On Event Time</h4>
@@ -413,6 +489,13 @@
 </code></pre>
 </div>
 
+<div class="language-py highlighter-rouge"><pre class="highlight"><code><span class="o">|</span> <span class="s">'FilterStartTime'</span> <span class="o">&gt;&gt;</span> <span class="n">beam</span><span class="o">.</span><span class="n">Filter</span><span class="p">(</span>
+    <span class="k">lambda</span> <span class="n">element</span><span class="p">:</span> <span class="n">element</span><span class="p">[</span><span class="s">'timestamp'</span><span class="p">]</span> <span class="o">&gt;</span> <span class="n">start_min_filter</span><span class="p">)</span>
+<span class="o">|</span> <span class="s">'FilterEndTime'</span> <span class="o">&gt;&gt;</span> <span class="n">beam</span><span class="o">.</span><span class="n">Filter</span><span class="p">(</span>
+    <span class="k">lambda</span> <span class="n">element</span><span class="p">:</span> <span class="n">element</span><span class="p">[</span><span class="s">'timestamp'</span><span class="p">]</span> <span class="o">&lt;</span> <span class="n">end_min_filter</span><span class="p">)</span>
+</code></pre>
+</div>
+
 <h4 id="calculating-score-per-team-per-window">Calculating Score Per Team, Per Window</h4>
 
 <p><code class="highlighter-rouge">HourlyTeamScore</code> uses the same <code class="highlighter-rouge">ExtractAndSumScores</code> transform as the <code class="highlighter-rouge">UserScore</code> pipeline, but passes a different key (team, as opposed to user). Also, because the pipeline applies <code class="highlighter-rouge">ExtractAndSumScores</code> <em>after</em> applying fixed-time 1-hour windowing to the input data, the data gets grouped by both team <em>and</em> window. You can see the full sequence of transforms in <code class="highlighter-rouge">HourlyTeamScore</code>’s main method:</p>
@@ -464,6 +547,68 @@
 </code></pre>
 </div>
 
+<div class="language-py highlighter-rouge"><pre class="highlight"><code><span class="k">class</span> <span class="nc">HourlyTeamScore</span><span class="p">(</span><span class="n">beam</span><span class="o">.</span><span class="n">PTransform</span><span class="p">):</span>
+  <span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">start_min</span><span class="p">,</span> <span class="n">stop_min</span><span class="p">,</span> <span class="n">window_duration</span><span class="p">):</span>
+    <span class="nb">super</span><span class="p">(</span><span class="n">HourlyTeamScore</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="n">__init__</span><span class="p">()</span>
+    <span class="bp">self</span><span class="o">.</span><span class="n">start_min</span> <span class="o">=</span> <span class="n">start_min</span>
+    <span class="bp">self</span><span class="o">.</span><span class="n">stop_min</span> <span class="o">=</span> <span class="n">stop_min</span>
+    <span class="bp">self</span><span class="o">.</span><span class="n">window_duration</span> <span class="o">=</span> <span class="n">window_duration</span>
+
+  <span class="k">def</span> <span class="nf">expand</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">pcoll</span><span class="p">):</span>
+    <span class="n">start_min_filter</span> <span class="o">=</span> <span class="n">string_to_timestamp</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">start_min</span><span class="p">)</span>
+    <span class="n">end_min_filter</span> <span class="o">=</span> <span class="n">string_to_timestamp</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">stop_min</span><span class="p">)</span>
+
+    <span class="k">return</span> <span class="p">(</span>
+        <span class="n">pcoll</span>
+        <span class="o">|</span> <span class="s">'ParseGameEvent'</span> <span class="o">&gt;&gt;</span> <span class="n">beam</span><span class="o">.</span><span class="n">ParDo</span><span class="p">(</span><span class="n">ParseEventFn</span><span class="p">())</span>
+        <span class="c"># Filter out data before and after the given times so that it is not</span>
+        <span class="c"># included in the calculations. As we collect data in batches (say, by</span>
+        <span class="c"># day), the batch for the day that we want to analyze could potentially</span>
+        <span class="c"># include some late-arriving data from the previous day. If so, we want</span>
+        <span class="c"># to weed it out. Similarly, if we include data from the following day</span>
+        <span class="c"># (to scoop up late-arriving events from the day we're analyzing), we</span>
+        <span class="c"># need to weed out events that fall after the time period we want to</span>
+        <span class="c"># analyze.</span>
+        <span class="o">|</span> <span class="s">'FilterStartTime'</span> <span class="o">&gt;&gt;</span> <span class="n">beam</span><span class="o">.</span><span class="n">Filter</span><span class="p">(</span>
+            <span class="k">lambda</span> <span class="n">element</span><span class="p">:</span> <span class="n">element</span><span class="p">[</span><span class="s">'timestamp'</span><span class="p">]</span> <span class="o">&gt;</span> <span class="n">start_min_filter</span><span class="p">)</span>
+        <span class="o">|</span> <span class="s">'FilterEndTime'</span> <span class="o">&gt;&gt;</span> <span class="n">beam</span><span class="o">.</span><span class="n">Filter</span><span class="p">(</span>
+            <span class="k">lambda</span> <span class="n">element</span><span class="p">:</span> <span class="n">element</span><span class="p">[</span><span class="s">'timestamp'</span><span class="p">]</span> <span class="o">&lt;</span> <span class="n">end_min_filter</span><span class="p">)</span>
+        <span class="c"># Add an element timestamp based on the event log, and apply fixed</span>
+        <span class="c"># windowing.</span>
+        <span class="c"># Convert element['timestamp'] into seconds as expected by</span>
+        <span class="c"># TimestampedValue.</span>
+        <span class="o">|</span> <span class="s">'AddEventTimestamps'</span> <span class="o">&gt;&gt;</span> <span class="n">beam</span><span class="o">.</span><span class="n">Map</span><span class="p">(</span>
+            <span class="k">lambda</span> <span class="n">element</span><span class="p">:</span> <span class="n">TimestampedValue</span><span class="p">(</span>
+                <span class="n">element</span><span class="p">,</span> <span class="n">element</span><span class="p">[</span><span class="s">'timestamp'</span><span class="p">]</span> <span class="o">/</span> <span class="mf">1000.0</span><span class="p">))</span>
+        <span class="c"># Convert window_duration into seconds as expected by FixedWindows.</span>
+        <span class="o">|</span> <span class="s">'FixedWindowsTeam'</span> <span class="o">&gt;&gt;</span> <span class="n">beam</span><span class="o">.</span><span class="n">WindowInto</span><span class="p">(</span><span class="n">FixedWindows</span><span class="p">(</span>
+            <span class="n">size</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">window_duration</span> <span class="o">*</span> <span class="mi">60</span><span class="p">))</span>
+        <span class="c"># Extract and sum teamname/score pairs from the event data.</span>
+        <span class="o">|</span> <span class="s">'ExtractTeamScore'</span> <span class="o">&gt;&gt;</span> <span class="n">ExtractAndSumScore</span><span class="p">(</span><span class="s">'team'</span><span class="p">))</span>
+
+
+<span class="k">def</span> <span class="nf">run</span><span class="p">(</span><span class="n">argv</span><span class="o">=</span><span class="bp">None</span><span class="p">):</span>
+  <span class="s">"""Main entry point; defines and runs the hourly_team_score pipeline."""</span>
+  <span class="o">...</span>
+
+  <span class="n">known_args</span><span class="p">,</span> <span class="n">pipeline_args</span> <span class="o">=</span> <span class="n">parser</span><span class="o">.</span><span class="n">parse_known_args</span><span class="p">(</span><span class="n">argv</span><span class="p">)</span>
+
+  <span class="n">pipeline_options</span> <span class="o">=</span> <span class="n">PipelineOptions</span><span class="p">(</span><span class="n">pipeline_args</span><span class="p">)</span>
+  <span class="n">p</span> <span class="o">=</span> <span class="n">beam</span><span class="o">.</span><span class="n">Pipeline</span><span class="p">(</span><span class="n">options</span><span class="o">=</span><span class="n">pipeline_options</span><span class="p">)</span>
+  <span class="n">pipeline_options</span><span class="o">.</span><span class="n">view_as</span><span class="p">(</span><span class="n">SetupOptions</span><span class="p">)</span><span class="o">.</span><span class="n">save_main_session</span> <span class="o">=</span> <span class="bp">True</span>
+
+  <span class="p">(</span><span class="n">p</span>  <span class="c"># pylint: disable=expression-not-assigned</span>
+   <span class="o">|</span> <span class="n">ReadFromText</span><span class="p">(</span><span class="n">known_args</span><span class="o">.</span><span class="nb">input</span><span class="p">)</span>
+   <span class="o">|</span> <span class="n">HourlyTeamScore</span><span class="p">(</span>
+       <span class="n">known_args</span><span class="o">.</span><span class="n">start_min</span><span class="p">,</span> <span class="n">known_args</span><span class="o">.</span><span class="n">stop_min</span><span class="p">,</span> <span class="n">known_args</span><span class="o">.</span><span class="n">window_duration</span><span class="p">)</span>
+   <span class="o">|</span> <span class="n">WriteWindowedToBigQuery</span><span class="p">(</span>
+       <span class="n">known_args</span><span class="o">.</span><span class="n">table_name</span><span class="p">,</span> <span class="n">known_args</span><span class="o">.</span><span class="n">dataset</span><span class="p">,</span> <span class="n">configure_bigquery_write</span><span class="p">()))</span>
+
+  <span class="n">result</span> <span class="o">=</span> <span class="n">p</span><span class="o">.</span><span class="n">run</span><span class="p">()</span>
+  <span class="n">result</span><span class="o">.</span><span class="n">wait_until_finish</span><span class="p">()</span>
+</code></pre>
+</div>
+
 <h3 id="limitations-1">Limitations</h3>
 
 <p>As written, <code class="highlighter-rouge">HourlyTeamScore</code> still has a limitation:</p>
@@ -474,6 +619,10 @@
 
 <h2 id="leaderboard-streaming-processing-with-real-time-game-data">LeaderBoard: Streaming Processing with Real-Time Game Data</h2>
 
+<blockquote>
+  <p><strong>Note:</strong> This example currently exists in Java only.</p>
+</blockquote>
+
 <p>One way we can help address the latency issue present in the <code class="highlighter-rouge">UserScore</code> and <code class="highlighter-rouge">HourlyTeamScore</code> pipelines is by reading the score data from an unbounded source. The <code class="highlighter-rouge">LeaderBoard</code> pipeline introduces streaming processing by reading the game score data from an unbounded source that produces an infinite amount of data, rather than from a file on the game server.</p>
 
 <p>The <code class="highlighter-rouge">LeaderBoard</code> pipeline also demonstrates how to process game score data with respect to both <em>processing time</em> and <em>event time</em>. <code class="highlighter-rouge">LeaderBoard</code> outputs data about both individual user scores and about team scores, each with respect to a different time frame.</p>
@@ -607,6 +756,10 @@
 
 <h2 id="gamestats-abuse-detection-and-usage-analysis">GameStats: Abuse Detection and Usage Analysis</h2>
 
+<blockquote>
+  <p><strong>Note:</strong> This example currently exists in Java only.</p>
+</blockquote>
+
 <p>While <code class="highlighter-rouge">LeaderBoard</code> demonstrates how to use basic windowing and triggers to perform low-latency and flexible data analysis, we can use more advanced windowing techniques to perform more comprehensive analysis. This might include some calculations designed to detect system abuse (like spam) or to gain insight into user behavior. The <code class="highlighter-rouge">GameStats</code> pipeline builds on the low-latency functionality in <code class="highlighter-rouge">LeaderBoard</code> to demonstrate how you can use Beam to perform this kind of advanced analysis.</p>
 
 <p>Like <code class="highlighter-rouge">LeaderBoard</code>, <code class="highlighter-rouge">GameStats</code> reads data from an unbounded source. It is best thought of as an ongoing job that provides insight into the game as users play.</p>