You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by me...@apache.org on 2018/05/07 17:08:39 UTC

[beam-site] 01/02: Updates for Python streaming

This is an automated email from the ASF dual-hosted git repository.

mergebot-role pushed a commit to branch mergebot
in repository https://gitbox.apache.org/repos/asf/beam-site.git

commit 6bff9dca52415248c5c3d9469c282bc0e7e016d2
Author: melissa <me...@google.com>
AuthorDate: Tue Apr 3 13:51:02 2018 -0700

    Updates for Python streaming
---
 src/_includes/section-menu/sdks.html       |   1 +
 src/documentation/programming-guide.md     | 115 ++++++-----
 src/documentation/runners/dataflow.md      |  71 ++++---
 src/documentation/runners/direct.md        |  12 +-
 src/documentation/sdks/python-streaming.md | 187 ++++++++++++++++++
 src/documentation/sdks/python.md           |  20 +-
 src/get-started/quickstart-java.md         |   3 +-
 src/get-started/quickstart-py.md           |  54 +++++-
 src/get-started/wordcount-example.md       | 295 +++++++++++++++++++----------
 9 files changed, 556 insertions(+), 202 deletions(-)

diff --git a/src/_includes/section-menu/sdks.html b/src/_includes/section-menu/sdks.html
index 729258f..ddcd1e2 100644
--- a/src/_includes/section-menu/sdks.html
+++ b/src/_includes/section-menu/sdks.html
@@ -22,6 +22,7 @@
                                                                                                                                    width="14" height="14"
                                                                                                                                    alt="External link."></a>
     </li>
+    <li><a href="{{ site.baseurl }}/documentation/sdks/python-streaming/">Python streaming pipelines</a></li>
     <li><a href="{{ site.baseurl }}/documentation/sdks/python-type-safety/">Ensuring Python type safety</a></li>
     <li><a href="{{ site.baseurl }}/documentation/sdks/python-pipeline-dependencies/">Managing pipeline dependencies</a></li>
     <li><a href="{{ site.baseurl }}/documentation/sdks/python-custom-io/">Creating new sources and sinks</a></li>
diff --git a/src/documentation/programming-guide.md b/src/documentation/programming-guide.md
index 37044cd..b3799cc 100644
--- a/src/documentation/programming-guide.md
+++ b/src/documentation/programming-guide.md
@@ -2390,9 +2390,6 @@ with a `DoFn` to attach the timestamps to each element in your `PCollection`.
 
 ## 8. Triggers {#triggers}
 
-> **NOTE:** This content applies only to the Beam SDK for Java. The Beam SDK for
-> Python does not support triggers.
-
 When collecting and grouping data into windows, Beam uses **triggers** to
 determine when to emit the aggregated results of each window (referred to as a
 *pane*). If you use Beam's default windowing configuration and [default
@@ -2454,12 +2451,12 @@ trigger emits the contents of a window after the
 [watermark](#watermarks-and-late-data) passes the end of the window, based on the
 timestamps attached to the data elements. The watermark is a global progress
 metric, and is Beam's notion of input completeness within your pipeline at any
-given point. `AfterWatermark.pastEndOfWindow()` *only* fires when the watermark
-passes the end of the window.
+given point. <span class="language-java">`AfterWatermark.pastEndOfWindow()`</span>
+<span class="language-py">`AfterWatermark`</span> *only* fires when the
+watermark passes the end of the window.
 
-In addition, you can use `.withEarlyFirings(trigger)` and
-`.withLateFirings(trigger)` to configure triggers that fire if your pipeline
-receives data before or after the end of the window.
+In addition, you can configure triggers that fire if your pipeline receives data
+before or after the end of the window.
 
 The following example shows a billing scenario, and uses both early and late
 firings:
@@ -2476,8 +2473,8 @@ firings:
       .withLateFirings(AfterPane.elementCountAtLeast(1))
 ```
 ```py
-  # The Beam SDK for Python does not support triggers.
-```
+{% github_sample /apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/snippets_test.py tag:model_early_late_triggers
+%}```
 
 #### 8.1.1. Default trigger {#default-trigger}
 
@@ -2494,9 +2491,10 @@ modifying this behavior.
 ### 8.2. Processing time triggers {#processing-time-triggers}
 
 The `AfterProcessingTime` trigger operates on *processing time*. For example,
-the `AfterProcessingTime.pastFirstElementInPane() ` trigger emits a window after
-a certain amount of processing time has passed since data was received. The
-processing time is determined by the system clock, rather than the data
+the <span class="language-java">`AfterProcessingTime.pastFirstElementInPane()`</span>
+<span class="language-py">`AfterProcessingTime`</span> trigger emits a window
+after a certain amount of processing time has passed since data was received.
+The processing time is determined by the system clock, rather than the data
 element's timestamp.
 
 The `AfterProcessingTime` trigger is useful for triggering early results from a
@@ -2505,26 +2503,41 @@ window.
 
 ### 8.3. Data-driven triggers {#data-driven-triggers}
 
-Beam provides one data-driven trigger, `AfterPane.elementCountAtLeast()`. This
-trigger works on an element count; it fires after the current pane has collected
-at least *N* elements. This allows a window to emit early results (before all
-the data has accumulated), which can be particularly useful if you are using a
-single global window.
-
-It is important to note that if, for example, you use `.elementCountAtLeast(50)`
-and only 32 elements arrive, those 32 elements sit around forever. If the 32
-elements are important to you, consider using [composite
-triggers](#composite-triggers) to combine multiple conditions. This allows you
-to specify multiple firing conditions such as “fire either when I receive 50
-elements, or every 1 second”.
+Beam provides one data-driven trigger,
+<span class="language-java">`AfterPane.elementCountAtLeast()`</span>
+<span class="language-py">`AfterCount`</span>. This trigger works on an element
+count; it fires after the current pane has collected at least *N* elements. This
+allows a window to emit early results (before all the data has accumulated),
+which can be particularly useful if you are using a single global window.
+
+It is important to note that if, for example, you specify
+<span class="language-java">`.elementCountAtLeast(50)`</span>
+<span class="language-py">AfterCount(50)</span> and only 32 elements arrive,
+those 32 elements sit around forever. If the 32 elements are important to you,
+consider using [composite triggers](#composite-triggers) to combine multiple
+conditions. This allows you to specify multiple firing conditions such as “fire
+either when I receive 50 elements, or every 1 second”.
 
 ### 8.4. Setting a trigger {#setting-a-trigger}
 
-When you set a windowing function for a `PCollection` by using the `Window`
+When you set a windowing function for a `PCollection` by using the
+<span class="language-java">`Window`</span><span class="language-py">`WindowInto`</span>
 transform, you can also specify a trigger.
 
+{:.language-java}
 You set the trigger(s) for a `PCollection` by invoking the method
-`.triggering()` on the result of your `Window.into()` transform, as follows:
+`.triggering()` on the result of your `Window.into()` transform. This code
+sample sets a time-based trigger for a `PCollection`, which emits results one
+minute after the first element in that window has been processed.  The last line
+in the code sample, `.discardingFiredPanes()`, sets the window's **accumulation
+mode**.
+
+{:.language-py}
+You set the trigger(s) for a `PCollection` by setting the `trigger` parameter
+when you use the `WindowInto` transform. This code sample sets a time-based
+trigger for a `PCollection`, which emits results one minute after the first
+element in that window has been processed. The `accumulation_mode` parameter
+sets the window's **accumulation mode**.
 
 ```java
   PCollection<String> pc = ...;
@@ -2534,13 +2547,8 @@ You set the trigger(s) for a `PCollection` by invoking the method
                                .discardingFiredPanes());
 ```
 ```py
-  # The Beam SDK for Python does not support triggers.
-```
-
-This code sample sets a time-based trigger for a `PCollection`, which emits
-results one minute after the first element in that window has been processed.
-The last line in the code sample, `.discardingFiredPanes()`, is the window's
-**accumulation mode**.
+{% github_sample /apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/snippets_test.py tag:model_setting_trigger
+%}```
 
 #### 8.4.1. Window accumulation modes {#window-accumulation-modes}
 
@@ -2550,10 +2558,17 @@ pane. Since a trigger can fire multiple times, the accumulation mode determines
 whether the system *accumulates* the window panes as the trigger fires, or
 *discards* them.
 
+{:.language-java}
 To set a window to accumulate the panes that are produced when the trigger
 fires, invoke`.accumulatingFiredPanes()` when you set the trigger. To set a
 window to discard fired panes, invoke `.discardingFiredPanes()`.
 
+{:.language-py}
+To set a window to accumulate the panes that are produced when the trigger
+fires, set the `accumulation_mode` parameter to `ACCUMULATING` when you set the
+trigger. To set a window to discard fired panes, set `accumulation_mode` to
+`DISCARDING`.
+
 Let's look an example that uses a `PCollection` with fixed-time windowing and a
 data-based trigger. This is something you might do if, for example, each window
 represented a ten-minute running average, but you wanted to display the current
@@ -2572,9 +2587,9 @@ we'll assume that the events all arrive in the pipeline in order.
 
 ##### 8.4.1.1. Accumulating mode {#accumulating-mode}
 
-If our trigger is set to `.accumulatingFiredPanes`, the trigger emits the
-following values each time it fires. Keep in mind that the trigger fires every
-time three elements arrive:
+If our trigger is set to accumulating mode, the trigger emits the following
+values each time it fires. Keep in mind that the trigger fires every time three
+elements arrive:
 
 ```
   First trigger firing:  [5, 8, 3]
@@ -2585,8 +2600,8 @@ time three elements arrive:
 
 ##### 8.4.1.2. Discarding mode {#discarding-mode}
 
-If our trigger is set to `.discardingFiredPanes`, the trigger emits the
-following values on each firing:
+If our trigger is set to discarding mode, the trigger emits the following values
+on each firing:
 
 ```
   First trigger firing:  [5, 8, 3]
@@ -2596,6 +2611,8 @@ following values on each firing:
 
 #### 8.4.2. Handling late data {#handling-late-data}
 
+> The Beam SDK for Python does not currently support allowed lateness.
+
 If you want your pipeline to process data that arrives after the watermark
 passes the end of the window, you can apply an *allowed lateness* when you set
 your windowing configuration. This gives your trigger the opportunity to react
@@ -2613,7 +2630,7 @@ windowing function:
                               .withAllowedLateness(Duration.standardMinutes(30));
 ```
 ```py
-  # The Beam SDK for Python does not support triggers.
+  # The Beam SDK for Python does not currently support allowed lateness.
 ```
 
 This allowed lateness propagates to all `PCollection`s derived as a result of
@@ -2652,7 +2669,7 @@ Beam includes the following composite triggers:
 *   `orFinally` can serve as a final condition to cause any trigger to fire one
     final time and never fire again.
 
-#### 8.5.2. Composition with AfterWatermark.pastEndOfWindow {#afterwatermark-pastendofwindow}
+#### 8.5.2. Composition with AfterWatermark {#composite-afterwatermark}
 
 Some of the most useful composite triggers fire a single time when Beam
 estimates that all the data has arrived (i.e. when the watermark passes the end
@@ -2660,15 +2677,19 @@ of the window) combined with either, or both, of the following:
 
 *   Speculative firings that precede the watermark passing the end of the window
     to allow faster processing of partial results.
+
 *   Late firings that happen after the watermark passes the end of the window,
     to allow for handling late-arriving data
 
-You can express this pattern using `AfterWatermark.pastEndOfWindow`. For
-example, the following example trigger code fires on the following conditions:
+You can express this pattern using `AfterWatermark`. For example, the following
+example trigger code fires on the following conditions:
 
 *   On Beam's estimate that all the data has arrived (the watermark passes the
     end of the window)
+
 *   Any time late data arrives, after a ten-minute delay
+
+{:.language-java}
 *   After two days, we assume no more data of interest will arrive, and the
     trigger stops executing
 
@@ -2683,8 +2704,8 @@ example, the following example trigger code fires on the following conditions:
       .withAllowedLateness(Duration.standardDays(2)));
 ```
 ```py
-  # The Beam SDK for Python does not support triggers.
-```
+{% github_sample /apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/snippets_test.py tag:model_composite_triggers
+%}```
 
 #### 8.5.3. Other composite triggers {#other-composite-triggers}
 
@@ -2698,5 +2719,5 @@ elements, or after a minute.
       AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardMinutes(1))))
 ```
 ```py
-  # The Beam SDK for Python does not support triggers.
-```
+{% github_sample /apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/snippets_test.py tag:model_other_composite_triggers
+%}```
diff --git a/src/documentation/runners/dataflow.md b/src/documentation/runners/dataflow.md
index 2ff8198..1487da2 100644
--- a/src/documentation/runners/dataflow.md
+++ b/src/documentation/runners/dataflow.md
@@ -25,32 +25,23 @@ The Cloud Dataflow Runner and service are suitable for large scale, continuous j
 
 The [Beam Capability Matrix]({{ site.baseurl }}/documentation/runners/capability-matrix/) documents the supported capabilities of the Cloud Dataflow Runner.
 
-## Cloud Dataflow Runner prerequisites and setup
-To use the Cloud Dataflow Runner, you must complete the following setup:
+## Cloud Dataflow Runner prerequisites and setup {#setup}
 
-1. Select or create a Google Cloud Platform Console project.
+To use the Cloud Dataflow Runner, you must complete the setup in the *Before you
+begin* section of the [Cloud Dataflow quickstart](https://cloud.google.com/dataflow/docs/quickstarts)
+for your chosen language.
 
+1. Select or create a Google Cloud Platform Console project.
 2. Enable billing for your project.
-
 3. Enable the required Google Cloud APIs: Cloud Dataflow, Compute Engine,
    Stackdriver Logging, Cloud Storage, Cloud Storage JSON, and Cloud Resource
    Manager. You may need to enable additional APIs (such as BigQuery, Cloud
    Pub/Sub, or Cloud Datastore) if you use them in your pipeline code.
+4. Authenticate with Google Cloud Platform.
+5. Install the Google Cloud SDK.
+6. Create a Cloud Storage bucket.
 
-4. Install the Google Cloud SDK.
-
-5. Create a Cloud Storage bucket.
-    * In the Google Cloud Platform Console, go to the Cloud Storage browser.
-    * Click **Create bucket**.
-    * In the **Create bucket** dialog, specify the following attributes:
-      * _Name_: A unique bucket name. Do not include sensitive information in the bucket name, as the bucket namespace is global and publicly visible.
-      * _Storage class_: Multi-Regional
-      * _Location_:  Choose your desired location
-    * Click **Create**.
-
-For more information, see the *Before you begin* section of the [Cloud Dataflow quickstarts](https://cloud.google.com/dataflow/docs/quickstarts).
-
-### Specify your dependency
+### Specify your dependency {#dependency}
 
 <span class="language-java">When using Java, you must specify your dependency on the Cloud Dataflow Runner in your `pom.xml`.</span>
 ```java
@@ -64,7 +55,7 @@ For more information, see the *Before you begin* section of the [Cloud Dataflow
 
 <span class="language-py">This section is not applicable to the Beam SDK for Python.</span>
 
-### Self executing JAR
+### Self executing JAR {#self-executing-jar}
 
 {:.language-py}
 This section is not applicable to the Beam SDK for Python.
@@ -118,15 +109,7 @@ java -jar target/beam-examples-bundled-1.0.0.jar \
   --tempLocation=gs://<YOUR_GCS_BUCKET>/temp/
 ```
 
-### Authentication
-
-Before running your pipeline, you must authenticate with the Google Cloud Platform. Run the following command to get [Application Default Credentials](https://developers.google.com/identity/protocols/application-default-credentials).
-
-```
-gcloud auth application-default login
-```
-
-## Pipeline options for the Cloud Dataflow Runner
+## Pipeline options for the Cloud Dataflow Runner {#pipeline-options}
 
 <span class="language-java">When executing your pipeline with the Cloud Dataflow Runner (Java), consider these common pipeline options.</span>
 <span class="language-py">When executing your pipeline with the Cloud Dataflow Runner (Python), consider these common pipeline options.</span>
@@ -150,8 +133,7 @@ gcloud auth application-default login
   <td>If not set, defaults to the default project in the current environment. The default project is set via <code>gcloud</code>.</td>
 </tr>
 
-<!-- Only show for Java -->
-<tr class="language-java">
+<tr>
   <td><code>streaming</code></td>
   <td>Whether streaming mode is enabled or disabled; <code>true</code> if enabled. Set to <code>true</code> if running pipelines with unbounded <code>PCollection</code>s.</td>
   <td><code>false</code></td>
@@ -212,17 +194,34 @@ See the reference documentation for the
 <span class="language-py">[`PipelineOptions`]({{ site.baseurl }}/documentation/sdks/pydoc/{{ site.release_latest }}/apache_beam.options.pipeline_options.html#apache_beam.options.pipeline_options.PipelineOptions)</span>
 interface (and any subinterfaces) for additional pipeline configuration options.
 
-## Additional information and caveats
+## Additional information and caveats {#additional-info}
 
-### Monitoring your job
+### Monitoring your job {#monitoring}
 
 While your pipeline executes, you can monitor the job's progress, view details on execution, and receive updates on the pipeline's results by using the [Dataflow Monitoring Interface](https://cloud.google.com/dataflow/pipelines/dataflow-monitoring-intf) or the [Dataflow Command-line Interface](https://cloud.google.com/dataflow/pipelines/dataflow-command-line-intf).
 
-### Blocking Execution
+### Blocking Execution {#blocking-execution}
 
 To block until your job completes, call <span class="language-java"><code>waitToFinish</code></span><span class="language-py"><code>wait_until_finish</code></span> on the `PipelineResult` returned from `pipeline.run()`. The Cloud Dataflow Runner prints job status updates and console messages while it waits. While the result is connected to the active job, note that pressing **Ctrl+C** from the command line does not cancel your job. To cancel the job, you can use the [Dataflow Monitoring  [...]
 
-### Streaming Execution
+### Streaming Execution {#streaming-execution}
+
+If your pipeline uses an unbounded data source or sink, you must set the `streaming` option to `true`.
+
+When using streaming execution, keep the following considerations in mind.
+
+1. Streaming pipelines do not terminate unless explicitly cancelled by the user.
+   You can cancel your streaming job from the [Dataflow Monitoring Interface](https://cloud.google.com/dataflow/pipelines/stopping-a-pipeline)
+   or with the [Dataflow Command-line Interface](https://cloud.google.com/dataflow/pipelines/dataflow-command-line-intf)
+   ([gcloud dataflow jobs cancel](https://cloud.google.com/sdk/gcloud/reference/dataflow/jobs/cancel)
+   command).
+
+2. Streaming jobs use a Google Compute Engine [machine type](https://cloud.google.com/compute/docs/machine-types)
+   of `n1-standard-2` or higher by default. You must not override this, as
+   `n1-standard-2` is the minimum required machine type for running streaming
+   jobs.
+
+3. Streaming execution [pricing](https://cloud.google.com/dataflow/pricing)
+   differs from batch execution.
+
 
-<span class="language-java">If your pipeline uses an unbounded data source or sink, you must set the `streaming` option to `true`.</span>
-<span class="language-py">The Beam SDK for Python does not currently support streaming pipelines.</span>
diff --git a/src/documentation/runners/direct.md b/src/documentation/runners/direct.md
index eeea747..70b7ccf 100644
--- a/src/documentation/runners/direct.md
+++ b/src/documentation/runners/direct.md
@@ -28,10 +28,10 @@ Here are some resources with information about how to test your pipelines.
 <ul>
   <!-- Java specific links -->
   <li class="language-java"><a href="{{ site.baseurl }}/blog/2016/10/20/test-stream.html">Testing Unbounded Pipelines in Apache Beam</a> talks about the use of Java classes <a href="{{ site.baseurl }}/documentation/sdks/javadoc/{{ site.release_latest }}/index.html?org/apache/beam/sdk/testing/PAssert.html">PAssert</a> and <a href="{{ site.baseurl }}/documentation/sdks/javadoc/{{ site.release_latest }}/index.html?org/apache/beam/sdk/testing/TestStream.html">TestStream</a> to test your pipe [...]
-  <li class="language-java">The <a href="{{ site.baseurl }}/get-started/wordcount-example/#testing-your-pipeline-via-passert">Apache Beam WordCount Example</a> contains an example of logging and testing a pipeline with <a href="{{ site.baseurl }}/documentation/sdks/javadoc/{{ site.release_latest }}/index.html?org/apache/beam/sdk/testing/PAssert.html"><code>PAssert</code></a>.</li>
+  <li class="language-java">The <a href="{{ site.baseurl }}/get-started/wordcount-example/#testing-your-pipeline-with-asserts">Apache Beam WordCount Walkthrough</a> contains an example of logging and testing a pipeline with <a href="{{ site.baseurl }}/documentation/sdks/javadoc/{{ site.release_latest }}/index.html?org/apache/beam/sdk/testing/PAssert.html">PAssert</a>.</li>
 
   <!-- Python specific links -->
-  <li class="language-py">You can use <a href="https://github.com/apache/beam/blob/master/sdks/python/apache_beam/testing/util.py#L76">assert_that</a> to test your pipeline. The Python <a href="https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/wordcount_debugging.py">WordCount Debugging Example</a> contains an example of logging and testing with <code>assert_that</code>.</li>
+  <li class="language-py">The <a href="{{ site.baseurl }}/get-started/wordcount-example/#testing-your-pipeline-with-asserts">Apache Beam WordCount Walkthrough</a> contains an example of logging and testing a pipeline with <a href="{{ site.baseurl }}/documentation/sdks/pydoc/{{ site.release_latest }}/apache_beam.testing.util.html#apache_beam.testing.util.assert_that">assert_that</a>.</li>
 </ul>
 
 ## Direct Runner prerequisites and setup
@@ -61,4 +61,12 @@ interface for defaults and additional pipeline configuration options.
 
 ## Additional information and caveats
 
+### Memory considerations
+
 Local execution is limited by the memory available in your local environment. It is highly recommended that you run your pipeline with data sets small enough to fit in local memory. You can create a small in-memory data set using a <span class="language-java">[`Create`]({{ site.baseurl }}/documentation/sdks/javadoc/{{ site.release_latest }}/index.html?org/apache/beam/sdk/transforms/Create.html)</span><span class="language-py">[`Create`](https://github.com/apache/beam/blob/master/sdks/pyt [...]
+
+### Streaming execution
+
+If your pipeline uses an unbounded data source or sink, you must set the `streaming` option to `true`.
+
+
diff --git a/src/documentation/sdks/python-streaming.md b/src/documentation/sdks/python-streaming.md
new file mode 100644
index 0000000..c878364
--- /dev/null
+++ b/src/documentation/sdks/python-streaming.md
@@ -0,0 +1,187 @@
+---
+layout: section
+title: "Apache Beam Python Streaming Pipelines"
+section_menu: section-menu/sdks.html
+permalink: /documentation/sdks/python-streaming/
+---
+
+# Python Streaming Pipelines
+
+Python streaming pipeline execution is experimentally available (with some
+[limitations](#unsupported-features)) starting with Beam SDK version 2.5.0.
+
+
+## Why use streaming execution?
+
+Beam creates an unbounded PCollection if your pipeline reads from a streaming or
+continously-updating data source (such as Cloud Pub/Sub). A runner must
+process an unbounded PCollection using a streaming job that runs continuously,
+as the entire collection is never available for processing at any one time.
+[Size and boundedness]({{ site.baseurl }}/documentation/programming-guide/#size-and-boundedness)
+has more information about bounded and unbounded collections.
+
+
+## Modifying a pipeline to use stream processing
+
+To modify a batch pipeline to support streaming, you must make the following
+code changes:
+
+* Use an I/O connector that supports reading from an unbounded source.
+* Use an I/O connector that supports writing to an unbounded source.
+* Choose a [windowing strategy]({{ site.baseurl }}/documentation/programming-guide/index.html#windowing).
+
+The Beam SDK for Python includes two I/O connectors that support unbounded
+PCollections: Google Cloud Pub/Sub (reading and writing) and Google BigQuery
+(writing).
+
+The following snippets show the necessary code changes to modify the batch
+WordCount example to support streaming:
+
+These batch WordCount snippets are from
+[wordcount.py](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/wordcount.py).
+This code uses the TextIO I/O connector to read from and write to a bounded
+collection.
+
+```
+  lines = p | 'read' >> ReadFromText(known_args.input)
+  ...
+
+  counts = (lines
+            | 'split' >> (beam.ParDo(WordExtractingDoFn())
+                          .with_output_types(six.text_type))
+            | 'pair_with_one' >> beam.Map(lambda x: (x, 1))
+            | 'group' >> beam.GroupByKey()
+            | 'count' >> beam.Map(count_ones))
+  ...
+
+  output = counts | 'format' >> beam.Map(format_result)
+
+  # Write the output using a "Write" transform that has side effects.
+  output | 'write' >> WriteToText(known_args.output)
+```
+
+These streaming WordCount snippets are from
+[streaming_wordcount.py](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/streaming_wordcount.py).
+This code uses an I/O connector that reads from and writes to an unbounded
+source (Cloud Pub/Sub) and specifies a fixed windowing strategy.
+
+```
+  lines = p | beam.io.ReadFromPubSub(topic=known_args.input_topic)
+  ...
+
+  counts = (lines
+            | 'split' >> (beam.ParDo(WordExtractingDoFn())
+                          .with_output_types(six.text_type))
+            | 'pair_with_one' >> beam.Map(lambda x: (x, 1))
+            | beam.WindowInto(window.FixedWindows(15, 0))
+            | 'group' >> beam.GroupByKey()
+            | 'count' >> beam.Map(count_ones))
+
+  ...
+
+  output = counts | 'format' >> beam.Map(format_result)
+
+  # Write to Pub/Sub
+  output | beam.io.WriteStringsToPubSub(known_args.output_topic)
+```
+
+## Running a streaming pipeline
+
+To run the example streaming WordCount pipeline, you must have a Cloud Pub/Sub
+input topic and output topic. To create, subscribe to, and pull from a topic for
+testing purposes, you can use the commands in the [Cloud Pub/Sub quickstart](https://cloud.google.com/pubsub/docs/quickstart-cli).
+
+The following simple bash script feeds lines of an input text file to your input
+topic:
+
+```
+cat <YOUR_LOCAL_TEXT_FILE> | while read line; do gcloud pubsub topics publish <YOUR_INPUT_TOPIC_NAME> “ $line”; done
+```
+
+Alternately, you can read from a publicly available Cloud Pub/Sub stream, such
+as `projects/pubsub-public-data/topics/taxirides-realtime`. However, you must
+create your own output topic to test writes.
+
+The following commands run the
+[streaming_wordcount.py](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/streaming_wordcount.py)
+example streaming pipeline. Specify your Cloud Pub/Sub project and input topic
+(`--input_topic`), output Cloud Pub/Sub project and topic (`--output_topic`).
+
+{:.runner-direct}
+```
+# DirectRunner requires the --streaming option
+python -m apache_beam.examples.streaming_wordcount \
+  --input_topic "projects/YOUR_PUBSUB_PROJECT_NAME/topics/YOUR_INPUT_TOPIC" \
+  --output_topic "projects/YOUR_PUBSUB_PROJECT_NAME/topics/YOUR_OUTPUT_TOPIC" \
+  --streaming
+```
+
+{:.runner-apex}
+```
+This runner is not yet available for the Python SDK.
+```
+
+{:.runner-flink-local}
+```
+This runner is not yet available for the Python SDK.
+```
+
+{:.runner-flink-cluster}
+```
+This runner is not yet available for the Python SDK.
+```
+
+{:.runner-spark}
+```
+This runner is not yet available for the Python SDK.
+```
+
+{:.runner-dataflow}
+```
+# As part of the initial setup, install Google Cloud Platform specific extra components.
+pip install apache-beam[gcp]
+
+# DataflowRunner requires the --streaming option
+python -m apache_beam.examples.streaming_wordcount \
+  --runner DataflowRunner \
+  --project YOUR_GCP_PROJECT \
+  --temp_location gs://YOUR_GCS_BUCKET/tmp/ \
+  --input_topic "projects/YOUR_PUBSUB_PROJECT_NAME/topics/YOUR_INPUT_TOPIC" \
+  --output_topic "projects/YOUR_PUBSUB_PROJECT_NAME/topics/YOUR_OUTPUT_TOPIC" \
+  --streaming
+```
+
+Check your runner's documentation for any additional runner-specific information
+about executing streaming pipelines:
+
+- [DirectRunner streaming execution]({{ site.baseurl }}/documentation/runners/direct/#streaming-execution)
+- [DataflowRunner streaming execution]({{ site.baseurl }}/documentation/runners/dataflow/#streaming-execution)
+
+
+## Unsupported features
+
+Python streaming execution does not currently support the following features.
+
+### General Beam features
+
+These unsupported Beam features apply to all runners.
+
+- `State` and `Timers` APIs
+- Custom source API
+- Splittable `DoFn` API
+- Handling of late data
+- User-defined custom `WindowFn`
+
+### DataflowRunner specific features
+
+Additionally, `DataflowRunner` does not currently support the following Cloud
+Dataflow specific features with Python streaming execution.
+
+- Streaming autoscaling
+- Updating existing pipelines
+- Cloud Dataflow Templates
+- Some monitoring features, such as msec counters, display data, metrics, and
+  element counts for transforms. However, logging, watermarks, and element
+  counts for sources are supported.
+
+
diff --git a/src/documentation/sdks/python.md b/src/documentation/sdks/python.md
index 46af4cc..9a26db2 100644
--- a/src/documentation/sdks/python.md
+++ b/src/documentation/sdks/python.md
@@ -6,22 +6,28 @@ permalink: /documentation/sdks/python/
 ---
 # Apache Beam Python SDK
 
-The Python SDK for Apache Beam provides a simple, powerful API for building batch data processing pipelines in Python.
+The Python SDK for Apache Beam provides a simple, powerful API for building batch and streaming data processing pipelines.
 
-## Get Started with the Python SDK
+## Get started with the Python SDK
 
-Get started with the [Beam Programming Guide]({{ site.baseurl }}/documentation/programming-guide) to learn the basic concepts that apply to all SDKs in Beam. Then, follow the [Beam Python SDK Quickstart]({{ site.baseurl }}/get-started/quickstart-py) to set up your Python development environment, get the Beam SDK for Python, and run an example pipeline.
+Get started with the [Beam Python SDK quickstart]({{ site.baseurl }}/get-started/quickstart-py) to set up your Python development environment, get the Beam SDK for Python, and run an example pipeline. Then, read through the [Beam programming guide]({{ site.baseurl }}/documentation/programming-guide) to learn the basic concepts that apply to all SDKs in Beam.
 
-See the [Python API Reference]({{ site.baseurl }}/documentation/sdks/pydoc/) for more information on individual APIs.
+See the [Python API reference]({{ site.baseurl }}/documentation/sdks/pydoc/) for more information on individual APIs.
 
-## Python Type Safety
+## Python streaming pipelines
+
+Python [streaming pipeline execution]({{ site.baseurl }}/documentation/sdks/python-streaming)
+is experimentally available (with some [limitations]({{ site.baseurl }}/documentation/sdks/python-streaming/#unsupported-features))
+starting with Beam SDK version 2.5.0.
+
+## Python type safety
 
 Python is a dynamically-typed language with no static type checking. The Beam SDK for Python uses type hints during pipeline construction and runtime to try to emulate the correctness guarantees achieved by true static typing. [Ensuring Python Type Safety]({{ site.baseurl }}/documentation/sdks/python-type-safety) walks through how to use type hints, which help you to catch potential bugs up front with the [Direct Runner]({{ site.baseurl }}/documentation/runners/direct/).
 
-## Managing Python Pipeline Dependencies
+## Managing Python pipeline dependencies
 
 When you run your pipeline locally, the packages that your pipeline depends on are available because they are installed on your local machine. However, when you want to run your pipeline remotely, you must make sure these dependencies are available on the remote machines. [Managing Python Pipeline Dependencies]({{ site.baseurl }}/documentation/sdks/python-pipeline-dependencies) shows you how to make your dependencies available to the remote workers.
 
-## Creating New Sources and Sinks
+## Creating new sources and Sinks
 
 The Beam SDK for Python provides an extensible API that you can use to create new data sources and sinks. [Creating New Sources and Sinks with the Python SDK]({{ site.baseurl }}/documentation/sdks/python-custom-io) shows how to create new sources and sinks using [Beam's Source and Sink API](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/iobase.py).
diff --git a/src/get-started/quickstart-java.md b/src/get-started/quickstart-java.md
index 0a5b045..bf21b60 100644
--- a/src/get-started/quickstart-java.md
+++ b/src/get-started/quickstart-java.md
@@ -104,6 +104,8 @@ $ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
 
 {:.runner-dataflow}
 ```
+Make sure you complete the setup steps at https://beam.apache.org/documentation/runners/dataflow/#setup
+
 $ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
      -Dexec.args="--runner=DataflowRunner --project=<your-gcp-project> \
                   --gcpTempLocation=gs://<your-gcs-bucket>/tmp \
@@ -141,7 +143,6 @@ $ ls /tmp/counts*
 $ ls counts*
 ```
 
-
 {:.runner-dataflow}
 ```
 $ gsutil ls gs://<your-gcs-bucket>/counts*
diff --git a/src/get-started/quickstart-py.md b/src/get-started/quickstart-py.md
index 4f5323a..cca95a0 100644
--- a/src/get-started/quickstart-py.md
+++ b/src/get-started/quickstart-py.md
@@ -30,16 +30,28 @@ Install [pip](https://pip.pypa.io/en/stable/installing/), Python's package manag
 pip --version
 ```
 
+If you do not have `pip` version 7.0.0 or newer, run the following command to
+install it. This command might require administrative privileges.
+
+```
+pip install --upgrade pip
+```
+
+
 ### Install Python virtual environment
 
 It is recommended that you install a [Python virtual environment](http://docs.python-guide.org/en/latest/dev/virtualenvs/)
-for initial experiments. If you do not have `virtualenv` version 13.1.0 or newer, install it by running:
+for initial experiments. If you do not have `virtualenv` version 13.1.0 or
+newer, run the following command to install it. This command might require
+administrative privileges.
 
 ```
 pip install --upgrade virtualenv
 ```
 
-If you do not want to use a Python virtual environment (not recommended), ensure `setuptools` is installed on your machine. If you do not have `setuptools` version 17.1 or newer, install it by running:
+If you do not want to use a Python virtual environment (not recommended), ensure
+`setuptools` is installed on your machine. If you do not have `setuptools`
+version 17.1 or newer, run the following command to install it.
 
 ```
 pip install --upgrade setuptools
@@ -77,9 +89,9 @@ Install the latest Python SDK from PyPI:
 pip install apache-beam
 ```
 
-#### Extra Requirements
+#### Extra requirements
 
-The above installation will not install all the extra dependencies for using features like the Google Cloud Dataflow runner. Information on what extra packages are required for different features are highlighted below. It is possible to install multitple extra requirements using something like `pip install apache-beam[feature1, feature2]`.
+The above installation will not install all the extra dependencies for using features like the Google Cloud Dataflow runner. Information on what extra packages are required for different features are highlighted below. It is possible to install multitple extra requirements using something like `pip install apache-beam[feature1,feature2]`.
 
 - **Google Cloud Platform**
   - Installation Command: `pip install apache-beam[gcp]`
@@ -95,20 +107,41 @@ The above installation will not install all the extra dependencies for using fea
   - Installation Command: `pip install apache-beam[docs]`
   - Generating API documentation using Sphinx
 
-## Execute a pipeline locally
+## Execute a pipeline
 
 The Apache Beam [examples](https://github.com/apache/beam/tree/master/sdks/python/apache_beam/examples) directory has many examples. All examples can be run locally by passing the required arguments described in the example script.
 
-For example, to run `wordcount.py`, run:
+For example, run `wordcount.py` with the following command:
 
 {:.runner-direct}
 ```
-python -m apache_beam.examples.wordcount --input <PATH_TO_INPUT_FILE> --output counts
+python -m apache_beam.examples.wordcount --input /path/to/inputfile --output /path/to/write/counts
+```
+
+{:.runner-apex}
+```
+This runner is not yet available for the Python SDK.
+```
+
+{:.runner-flink-local}
+```
+This runner is not yet available for the Python SDK.
+```
+
+{:.runner-flink-cluster}
+```
+This runner is not yet available for the Python SDK.
+```
+
+{:.runner-spark}
+```
+This runner is not yet available for the Python SDK.
 ```
 
 {:.runner-dataflow}
 ```
-# As part of the initial setup, install Google Cloud Platform specific extra components.
+# As part of the initial setup, install Google Cloud Platform specific extra components. Make sure you
+# complete the setup steps at https://beam.apache.org/documentation/runners/dataflow/#setup
 pip install apache-beam[gcp]
 python -m apache_beam.examples.wordcount --input gs://dataflow-samples/shakespeare/kinglear.txt \
                                          --output gs://<your-gcs-bucket>/counts \
@@ -117,6 +150,11 @@ python -m apache_beam.examples.wordcount --input gs://dataflow-samples/shakespea
                                          --temp_location gs://<your-gcs-bucket>/tmp/
 ```
 
+After the pipeline completes, you can view the output files at your specified
+output path. For example, if you specify `/dir1/counts` for the `--output`
+parameter, the pipeline writes the files to `/dir1/` and names the files
+sequentially in the format `counts-0000-of-0001`.
+
 ## Next Steps
 
 * Learn more about the [Beam SDK for Python]({{ site.baseurl }}/documentation/sdks/python/)
diff --git a/src/get-started/wordcount-example.md b/src/get-started/wordcount-example.md
index e24fa07..a334915 100644
--- a/src/get-started/wordcount-example.md
+++ b/src/get-started/wordcount-example.md
@@ -40,62 +40,31 @@ continue on to learn more concepts in the other examples.
 
 ## MinimalWordCount example
 
-MinimalWordCount demonstrates a simple pipeline that can read from a text file,
-apply transforms to tokenize and count the words, and write the data to an
-output text file. This example hard-codes the locations for its input and output
-files and doesn't perform any error checking; it is intended to only show you
-the "bare bones" of creating a Beam pipeline. This lack of parameterization
-makes this particular pipeline less portable across different runners than
-standard Beam pipelines. In later examples, we will parameterize the pipeline's
-input and output sources and show other best practices.
+MinimalWordCount demonstrates a simple pipeline that uses the Direct Runner to
+read from a text file, apply transforms to tokenize and count the words, and
+write the data to an output text file.
+
+{:.language-java}
+This example hard-codes the locations for its input and output files and doesn't
+perform any error checking; it is intended to only show you the "bare bones" of
+creating a Beam pipeline. This lack of parameterization makes this particular
+pipeline less portable across different runners than standard Beam pipelines. In
+later examples, we will parameterize the pipeline's input and output sources and
+show other best practices.
 
-**To run this example in Java:**
-
-```
+```java
 $ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.MinimalWordCount
 ```
 
-To view the full code in Java, see
-**[MinimalWordCount](https://github.com/apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/MinimalWordCount.java).**
-
-**To run this example in Python:**
-
-{:.runner-direct}
-```
-python -m apache_beam.examples.wordcount_minimal --input README.md --output counts
-```
-
-{:.runner-apex}
-```
-This runner is not yet available for the Python SDK.
-```
-
-{:.runner-flink-local}
-```
-This runner is not yet available for the Python SDK.
-```
-
-{:.runner-flink-cluster}
-```
-This runner is not yet available for the Python SDK.
-```
-
-{:.runner-spark}
-```
-This runner is not yet available for the Python SDK.
+```py
+python -m apache_beam.examples.wordcount_minimal --input YOUR_INPUT_FILE --output counts
 ```
 
-{:.runner-dataflow}
-```
-# As part of the initial setup, install Google Cloud Platform specific extra components.
-pip install apache-beam[gcp]
-python -m apache_beam.examples.wordcount_minimal --input gs://dataflow-samples/shakespeare/kinglear.txt \
-                                                 --output gs://<your-gcs-bucket>/counts \
-                                                 --runner DataflowRunner \
-                                                 --project your-gcp-project \
-                                                 --temp_location gs://<your-gcs-bucket>/tmp/
-```
+{:.language-java}
+To view the full code in Java, see
+**[MinimalWordCount](https://github.com/apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/MinimalWordCount.java).**
 
+{:.language-py}
 To view the full code in Python, see
 **[wordcount_minimal.py](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/wordcount_minimal.py).**
 
@@ -131,19 +100,6 @@ sections, we will specify the pipeline's runner.
  // will run with the DirectRunner by default, based on the class path configured
  // in its dependencies.
  PipelineOptions options = PipelineOptionsFactory.create();
-
-    // In order to run your pipeline, you need to make following runner specific changes:
-    //
-    // CHANGE 1/3: Select a Beam runner, such as DataflowRunner or FlinkRunner.
-    // CHANGE 2/3: Specify runner-required options.
-    // For DataflowRunner, set project and temp location as follows:
-    //   DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
-    //   dataflowOptions.setRunner(DataflowRunner.class);
-    //   dataflowOptions.setProject("SET_YOUR_PROJECT_ID_HERE");
-    //   dataflowOptions.setTempLocation("gs://SET_YOUR_BUCKET_NAME_HERE/AND_TEMP_DIRECTORY");
-    // For FlinkRunner, set the runner as follows. See {@code FlinkPipelineOptions}
-    // for more details.
-    //   options.setRunner(FlinkRunner.class);
 ```
 
 ```py
@@ -194,9 +150,9 @@ The MinimalWordCount pipeline contains five transforms:
     {% github_sample /apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/snippets.py tag:examples_wordcount_minimal_read
     %}```
 
-2.  This transform splits the lines in PCollection<String>, where each element
+2.  This transform splits the lines in `PCollection<String>`, where each element
     is an individual word in Shakespeare's collected texts.
-    As an alternative, it would have been possible to use a 
+    As an alternative, it would have been possible to use a
     [ParDo]({{ site.baseurl }}/documentation/programming-guide/#pardo)
     transform that invokes a `DoFn` (defined in-line as an anonymous class) on
     each element that tokenizes the text lines into individual words. The input
@@ -335,8 +291,8 @@ $ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
 {:.runner-dataflow}
 ```
 $ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
-     -Dexec.args="--runner=DataflowRunner --gcpTempLocation=gs://<your-gcs-bucket>/tmp \
-                  --inputFile=gs://apache-beam-samples/shakespeare/* --output=gs://<your-gcs-bucket>/counts" \
+     -Dexec.args="--runner=DataflowRunner --gcpTempLocation=gs://YOUR_GCS_BUCKET/tmp \
+                  --inputFile=gs://apache-beam-samples/shakespeare/* --output=gs://YOUR_GCS_BUCKET/counts" \
      -Pdataflow-runner
 ```
 
@@ -347,7 +303,7 @@ To view the full code in Java, see
 
 {:.runner-direct}
 ```
-python -m apache_beam.examples.wordcount --input README.md --output counts
+python -m apache_beam.examples.wordcount --input YOUR_INPUT_FILE --output counts
 ```
 
 {:.runner-apex}
@@ -375,10 +331,10 @@ This runner is not yet available for the Python SDK.
 # As part of the initial setup, install Google Cloud Platform specific extra components.
 pip install apache-beam[gcp]
 python -m apache_beam.examples.wordcount --input gs://dataflow-samples/shakespeare/kinglear.txt \
-                                         --output gs://<your-gcs-bucket>/counts \
+                                         --output gs://YOUR_GCS_BUCKET/counts \
                                          --runner DataflowRunner \
-                                         --project your-gcp-project \
-                                         --temp_location gs://<your-gcs-bucket>/tmp/
+                                         --project YOUR_GCP_PROJECT \
+                                         --temp_location gs://YOUR_GCS_BUCKET/tmp/
 ```
 
 To view the full code in Python, see
@@ -554,7 +510,7 @@ To view the full code in Java, see
 
 {:.runner-direct}
 ```
-python -m apache_beam.examples.wordcount_debugging --input README.md --output counts
+python -m apache_beam.examples.wordcount_debugging --input YOUR_INPUT_FILE --output counts
 ```
 
 {:.runner-apex}
@@ -582,10 +538,10 @@ This runner is not yet available for the Python SDK.
 # As part of the initial setup, install Google Cloud Platform specific extra components.
 pip install apache-beam[gcp]
 python -m apache_beam.examples.wordcount_debugging --input gs://dataflow-samples/shakespeare/kinglear.txt \
-                                         --output gs://<your-gcs-bucket>/counts \
+                                         --output gs://YOUR_GCS_BUCKET/counts \
                                          --runner DataflowRunner \
-                                         --project your-gcp-project \
-                                         --temp_location gs://<your-gcs-bucket>/tmp/
+                                         --project YOUR_GCP_PROJECT \
+                                         --temp_location gs://YOUR_GCS_BUCKET/tmp/
 ```
 
 To view the full code in Python, see
@@ -677,18 +633,23 @@ or DEBUG significantly increases the amount of logs output.
 > **Note:** This section is yet to be added. There is an open issue for this
 > ([BEAM-2285](https://issues.apache.org/jira/browse/BEAM-2285)).
 
-### Testing your pipeline via PAssert
+### Testing your pipeline with asserts
 
-`PAssert` is a set of convenient PTransforms in the style of Hamcrest's
-collection matchers that can be used when writing Pipeline level tests to
-validate the contents of PCollections. `PAssert` is best used in unit tests with
-small data sets, but is demonstrated here as a teaching tool.
+<span class="language-java">`PAssert`</span><span class="language-py">`assert_that`</span>
+is a set of convenient PTransforms in the style of Hamcrest's collection
+matchers that can be used when writing pipeline level tests to validate the
+contents of PCollections. Asserts are best used in unit tests with small data
+sets.
 
-Below, we verify that the set of filtered words matches our expected counts.
-Note that `PAssert` does not produce any output, and the pipeline only succeeds
-if all of the expectations are met. See
-[DebuggingWordCountTest](https://github.com/apache/beam/blob/master/examples/java/src/test/java/org/apache/beam/examples/DebuggingWordCountTest.java)
-for an example unit test.
+{:.language-java}
+The following example verifies that the set of filtered words matches our
+expected counts. The assert does not produce any output, and the pipeline only
+succeeds if all of the expectations are met.
+
+{:.language-py}
+The following example verifies that two collections contain the same values. The
+assert does not produce any output, and the pipeline only succeeds if all of the
+expectations are met.
 
 ```java
 public static void main(String[] args) {
@@ -702,9 +663,18 @@ public static void main(String[] args) {
 ```
 
 ```py
-# This feature is not yet available in the Beam SDK for Python.
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
+
+with TestPipeline() as p:
+  assert_that(p | Create([1, 2, 3]), equal_to([1, 2, 3]))
 ```
 
+{:.language-java}
+See [DebuggingWordCountTest](https://github.com/apache/beam/blob/master/examples/java/src/test/java/org/apache/beam/examples/DebuggingWordCountTest.java)
+for an example unit test.
+
+
 ## WindowedWordCount example
 
 The WindowedWordCount example counts words in text just as the previous
@@ -758,15 +728,58 @@ $ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WindowedWordCo
 {:.runner-dataflow}
 ```
 $ mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WindowedWordCount \
-   -Dexec.args="--runner=DataflowRunner --gcpTempLocation=gs://<your-gcs-bucket>/tmp \
-                --inputFile=gs://apache-beam-samples/shakespeare/* --output=gs://<your-gcs-bucket>/counts" \
+   -Dexec.args="--runner=DataflowRunner --gcpTempLocation=gs://YOUR_GCS_BUCKET/tmp \
+                --inputFile=gs://apache-beam-samples/shakespeare/* --output=gs://YOUR_GCS_BUCKET/counts" \
      -Pdataflow-runner
 ```
 
 To view the full code in Java, see
 **[WindowedWordCount](https://github.com/apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java).**
 
-> **Note:** WindowedWordCount is not yet available for the Python SDK.
+**To run this example in Python:**
+
+This pipeline writes its results to a BigQuery table `--output_table`
+parameter. using the format `PROJECT:DATASET.TABLE` or
+`DATASET.TABLE`.
+
+{:.runner-direct}
+```
+python -m apache_beam.examples.windowed_wordcount --input YOUR_INPUT_FILE --output_table PROJECT:DATASET.TABLE
+```
+
+{:.runner-apex}
+```
+This runner is not yet available for the Python SDK.
+```
+
+{:.runner-flink-local}
+```
+This runner is not yet available for the Python SDK.
+```
+
+{:.runner-flink-cluster}
+```
+This runner is not yet available for the Python SDK.
+```
+
+{:.runner-spark}
+```
+This runner is not yet available for the Python SDK.
+```
+
+{:.runner-dataflow}
+```
+# As part of the initial setup, install Google Cloud Platform specific extra components.
+pip install apache-beam[gcp]
+python -m apache_beam.examples.windowed_wordcount --input YOUR_INPUT_FILE \
+                                         --output_table PROJECT:DATASET.TABLE \
+                                         --runner DataflowRunner \
+                                         --project YOUR_GCP_PROJECT \
+                                         --temp_location gs://YOUR_GCS_BUCKET/tmp/
+```
+
+To view the full code in Python, see
+**[windowed_wordcount.py](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/windowed_wordcount.py).**
 
 ### Unbounded and bounded pipeline input modes
 
@@ -890,24 +903,104 @@ PCollection<KV<String, Long>> wordCounts = windowedWords.apply(new WordCount.Cou
 # This feature is not yet available in the Beam SDK for Python.
 ```
 
-### Writing results to an unbounded sink
+## StreamingWordCount example
 
-When our input is unbounded, the same is true of our output `PCollection`. We
-need to make sure that we choose an appropriate, unbounded sink. Some output
-sinks support only bounded output, while others support both bounded and
-unbounded outputs. By using a `FilenamePolicy`, we can use `TextIO` to files
-that are partitioned by windows. We use a composite `PTransform` that uses such
-a policy internally to write a single sharded file per window.
+The StreamingWordCount example is a streaming pipeline that reads Pub/Sub
+messages from a Pub/Sub subscription or topic, and performs a frequency count on
+the words in each message. Similar to WindowedWordCount, this example applies
+fixed-time windowing, wherein each window represents a fixed time interval. The
+fixed window size for this example is 15 seconds. The pipeline outputs the
+frequency count of the words seen in each 15 second window.
 
-In this example, we stream the results to Google BigQuery. The code formats the
-results and writes them to a BigQuery table using `BigQueryIO.Write`.
+**New Concepts:**
+
+* Reading an unbounded data set
+* Writing unbounded results
+
+**To run this example in Java:**
+
+> **Note:** StreamingWordCount is not yet available for the Java SDK.
+
+**To run this example in Python:**
+
+{:.runner-direct}
+```
+python -m apache_beam.examples.streaming_wordcount \
+  --input_topic "projects/YOUR_PUBSUB_PROJECT_NAME/topics/YOUR_INPUT_TOPIC" \
+  --output_topic "projects/YOUR_PUBSUB_PROJECT_NAME/topics/YOUR_OUTPUT_TOPIC" \
+  --streaming
+```
+
+{:.runner-apex}
+```
+This runner is not yet available for the Python SDK.
+```
+
+{:.runner-flink-local}
+```
+This runner is not yet available for the Python SDK.
+```
+
+{:.runner-flink-cluster}
+```
+This runner is not yet available for the Python SDK.
+```
+
+{:.runner-spark}
+```
+This runner is not yet available for the Python SDK.
+```
+
+{:.runner-dataflow}
+```
+# As part of the initial setup, install Google Cloud Platform specific extra components.
+pip install apache-beam[gcp]
+python -m apache_beam.examples.streaming_wordcount \
+  --runner DataflowRunner \
+  --project YOUR_GCP_PROJECT \
+  --temp_location gs://YOUR_GCS_BUCKET/tmp/ \
+  --input_topic "projects/YOUR_PUBSUB_PROJECT_NAME/topics/YOUR_INPUT_TOPIC" \
+  --output_topic "projects/YOUR_PUBSUB_PROJECT_NAME/topics/YOUR_OUTPUT_TOPIC" \
+  --streaming
+```
+
+To view the full code in Python, see
+**[streaming_wordcount.py](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/streaming_wordcount.py).**
+
+
+### Reading an unbounded data set
+
+This example uses an unbounded data set as input. The code reads Pub/Sub
+messages from a Pub/Sub subscription or topic using
+[`beam.io.ReadStringsFromPubSub`]({{ site.baseurl }}/documentation/sdks/pydoc/{{ site.release_latest }}/apache_beam.io.gcp.pubsub.html#apache_beam.io.gcp.pubsub.ReadStringsFromPubSub).
 
 ```java
-  wordCounts
-      .apply(MapElements.via(new WordCount.FormatAsTextFn()))
-      .apply(new WriteOneFilePerWindow(output, options.getNumShards()));
+  // This example is not currently available for the Beam SDK for Java.
 ```
+```py
+  # Read from Pub/Sub into a PCollection.
+  if known_args.input_subscription:
+    lines = p | beam.io.ReadStringsFromPubSub(
+        subscription=known_args.input_subscription)
+  else:
+    lines = p | beam.io.ReadStringsFromPubSub(topic=known_args.input_topic)
+```
+### Writing unbounded results
+
+When the input is unbounded, the same is true of the output `PCollection`. As
+such, you must make sure to choose an appropriate I/O for the results. Some I/Os
+support only bounded output, while others support both bounded and unbounded
+outputs.
+
+This example uses an unbounded `PCollection` and streams the results to
+Google Pub/Sub. The code formats the results and writes them to a Pub/Sub topic
+using [`beam.io.WriteStringsToPubSub`]({{ site.baseurl }}/documentation/sdks/pydoc/{{ site.release_latest }}/apache_beam.io.gcp.pubsub.html#apache_beam.io.gcp.pubsub.WriteStringsToPubSub).
 
+```java
+  // This example is not currently available for the Beam SDK for Java.
+```
 ```py
-# This feature is not yet available in the Beam SDK for Python.
+  # Write to Pub/Sub
+  output | beam.io.WriteStringsToPubSub(known_args.output_topic)
 ```
+

-- 
To stop receiving notification emails like this one, please contact
mergebot-role@apache.org.