You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/07/07 05:19:23 UTC

[GitHub] [beam] chamikaramj opened a new pull request #12188: [BEAM-10411] Adds an example that use Python cross-language Kafka transforms.

chamikaramj opened a new pull request #12188:
URL: https://github.com/apache/beam/pull/12188


   **Please** add a meaningful description for your change here
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Dataflow | Flink | Samza | Spark | Twister2
   --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) | ---
   Java | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/)
   Python | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/) | ---
   XLang | --- | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/) | ---
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website
   --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/)
   Portable | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] chamikaramj commented on pull request #12188: [BEAM-10411] Adds an example that use Python cross-language Kafka transforms.

Posted by GitBox <gi...@apache.org>.
chamikaramj commented on pull request #12188:
URL: https://github.com/apache/beam/pull/12188#issuecomment-660271254


   Run Python PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] davidcavazos commented on a change in pull request #12188: [BEAM-10411] Adds an example that use Python cross-language Kafka transforms.

Posted by GitBox <gi...@apache.org>.
davidcavazos commented on a change in pull request #12188:
URL: https://github.com/apache/beam/pull/12188#discussion_r457726212



##########
File path: sdks/python/apache_beam/examples/kafkataxi/README.md
##########
@@ -0,0 +1,190 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+
+# Python KafkaIO Example
+
+This example reads from the Google Cloud Pub/Sub NYC Taxi stream described
+[here](https://github.com/googlecodelabs/cloud-dataflow-nyc-taxi-tycoon), writes
+to a given Kafka topic, and reads back from the same Kafka topic. This example
+uses cross-language transforms available in
+[kafka.py](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/kafka.py).
+Transforms are implemented in Java and are available
+[here](https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java).
+
+## Prerequisites
+
+Install [Java Development kit (JDK) version 8](https://www.oracle.com/java/technologies/javase-downloads.html)

Review comment:
       Is there any reason to link to the proprietary version instead of the open source version? If there are no incompatibilities, I would suggest using the [open source version](https://adoptopenjdk.net/?variant=openjdk11&jvmVariant=openj9).

##########
File path: sdks/python/apache_beam/examples/kafkataxi/README.md
##########
@@ -0,0 +1,190 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+
+# Python KafkaIO Example
+
+This example reads from the Google Cloud Pub/Sub NYC Taxi stream described
+[here](https://github.com/googlecodelabs/cloud-dataflow-nyc-taxi-tycoon), writes
+to a given Kafka topic, and reads back from the same Kafka topic. This example
+uses cross-language transforms available in
+[kafka.py](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/kafka.py).
+Transforms are implemented in Java and are available
+[here](https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java).
+
+## Prerequisites
+
+Install [Java Development kit (JDK) version 8](https://www.oracle.com/java/technologies/javase-downloads.html)
+in your system and make sure that `JAVA_HOME` environment variable points to
+your JDK installation. Make sure that `java` command is available in 
+the environment.
+
+```sh
+java --version
+<Should print information regarding the installed Java version>
+```
+
+## Setup the Kafka cluster
+
+This example requires users to setup a Kafka cluster that the Beam runner
+executing the pipeline has access to.
+
+See [here]((https://kafka.apache.org/quickstart)) for general instructions on
+setting up a Kafka cluster. One option is to setup the Kafka cluster in
+[GCE](https://cloud.google.com/compute). See
+[here](https://github.com/GoogleCloudPlatform/java-docs-samples/tree/master/dataflow/flex-templates/kafka_to_bigquery)
+for step by step instructions on  setting up a single node Kafka cluster in GCE.
+When using Dataflow consider starting the Kafka cluster in the region where
+Dataflow pipeline will be running. See
+[here](https://cloud.google.com/dataflow/docs/concepts/regional-endpoints) 
+for more details regarding the selecting a GCP region for Dataflow.
+
+Let's assume that that IP address of one of the [bootstrap servers](https://kafka.apache.org/quickstart)
+of the Kafka cluster to be  `123.45.67.89:123` and the port to be `9092`.
+
+```sh
+export BOOTSTRAP_SERVER="123.45.67.89:123:9092"
+```
+
+## Running the example on latest released Beam version
+
+Perform Beam runner specific setup. Note that cross-language transforms require 
+portable implementations of Spark/Flink/Direct runners. Dataflow requires
+[runner V2](https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#dataflow-runner-v2).
+See [here](https://beam.apache.org/documentation/runners/dataflow/) for 
+instructions for setting up Dataflow.
+
+Setup a virtual environment for running Beam Python programs. See 
+[here](https://beam.apache.org/get-started/quickstart-py/) for prerequisites. 
+Dataflow requires the `gcp` tag when installing Beam.
+
+```sh
+python -m venv env
+source env/bin/activate
+pip install -e 'apache-beam[gcp]'
+```
+
+Run the Beam pipeline. You can either use the default Kafka topic name or 
+specify a Kafka topic name. Following command assumes Dataflow. See 
+[here](https://beam.apache.org/get-started/quickstart-py/) for instructions on 
+running Beam Python programs on other runners.
+
+Note that this exemple is not available in Beam versions before 2.24.0 hence

Review comment:
       Nit: can we make this a quote block with an information sign?
   
   ```
   > ℹ️ Note that this ...
   ```

##########
File path: sdks/python/apache_beam/examples/kafkataxi/README.md
##########
@@ -0,0 +1,190 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+
+# Python KafkaIO Example
+
+This example reads from the Google Cloud Pub/Sub NYC Taxi stream described
+[here](https://github.com/googlecodelabs/cloud-dataflow-nyc-taxi-tycoon), writes
+to a given Kafka topic, and reads back from the same Kafka topic. This example
+uses cross-language transforms available in
+[kafka.py](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/kafka.py).
+Transforms are implemented in Java and are available
+[here](https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java).
+
+## Prerequisites
+
+Install [Java Development kit (JDK) version 8](https://www.oracle.com/java/technologies/javase-downloads.html)
+in your system and make sure that `JAVA_HOME` environment variable points to
+your JDK installation. Make sure that `java` command is available in 
+the environment.
+
+```sh
+java --version
+<Should print information regarding the installed Java version>
+```
+
+## Setup the Kafka cluster
+
+This example requires users to setup a Kafka cluster that the Beam runner
+executing the pipeline has access to.
+
+See [here]((https://kafka.apache.org/quickstart)) for general instructions on
+setting up a Kafka cluster. One option is to setup the Kafka cluster in
+[GCE](https://cloud.google.com/compute). See
+[here](https://github.com/GoogleCloudPlatform/java-docs-samples/tree/master/dataflow/flex-templates/kafka_to_bigquery)
+for step by step instructions on  setting up a single node Kafka cluster in GCE.
+When using Dataflow consider starting the Kafka cluster in the region where
+Dataflow pipeline will be running. See
+[here](https://cloud.google.com/dataflow/docs/concepts/regional-endpoints) 
+for more details regarding the selecting a GCP region for Dataflow.
+
+Let's assume that that IP address of one of the [bootstrap servers](https://kafka.apache.org/quickstart)
+of the Kafka cluster to be  `123.45.67.89:123` and the port to be `9092`.
+
+```sh
+export BOOTSTRAP_SERVER="123.45.67.89:123:9092"
+```
+
+## Running the example on latest released Beam version
+
+Perform Beam runner specific setup. Note that cross-language transforms require 
+portable implementations of Spark/Flink/Direct runners. Dataflow requires
+[runner V2](https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#dataflow-runner-v2).
+See [here](https://beam.apache.org/documentation/runners/dataflow/) for 
+instructions for setting up Dataflow.
+
+Setup a virtual environment for running Beam Python programs. See 
+[here](https://beam.apache.org/get-started/quickstart-py/) for prerequisites. 
+Dataflow requires the `gcp` tag when installing Beam.
+
+```sh
+python -m venv env
+source env/bin/activate
+pip install -e 'apache-beam[gcp]'
+```
+
+Run the Beam pipeline. You can either use the default Kafka topic name or 
+specify a Kafka topic name. Following command assumes Dataflow. See 
+[here](https://beam.apache.org/get-started/quickstart-py/) for instructions on 
+running Beam Python programs on other runners.
+
+Note that this exemple is not available in Beam versions before 2.24.0 hence
+you'll have to either get the example program from Beam or follow steps
+provided in the section *Running the Example from a Beam Git Clone*.
+
+```sh
+export PROJECT="$(gcloud config get-value project)"
+export TEMP_LOCATION="gs://MY-BUCKET/temp"
+export REGION="us-central1" 
+export JOB_NAME="kafka-taxi-`date +%Y%m%d-%H%M%S`"
+export NUM_WORKERS="5"
+
+python -m apache_beam.examples.kafkataxi.kafka_taxi \
+  --runner DataflowRunner \
+  --temp_location $TEMP_LOCATION \
+  --project $PROJECT \
+  --region $REGION \
+  --num_workers $NUM_WORKERS \
+  --job_name $JOB_NAME \
+  --bootstrap_servers $BOOTSTRAP_SERVER \
+  --experiments=use_runner_v2
+```
+
+## *(Optional)*  Running the Example from a Beam Git Clone
+
+Running this example from a Beam Git clone requires some additional steps.
+
+Checkout a clone of the Beam Git repo. See 
+[here](https://beam.apache.org/contribute/) for prerequisites.
+
+Assume your Github username to be `GITHUB_USERNAME`.
+
+```sh
+git clone git@github.com:${GITHUB_USERNAME}/beam
+cd beam
+```
+
+Build IO expansion service jar.
+
+```sh
+./gradlew :sdks:java:io:expansion-service:build
+```
+
+Push a java SDK Harness container to [Docker](https://www.docker.com/get-started)
+Hub. See 
+[here](https://beam.apache.org/documentation/runtime/environments/) for 
+prerequisites and additional information.
+
+```sh
+export DOCKER_ROOT="Your Docker Repository Root"
+./gradlew :sdks:java:container:docker -Pdocker-repository-root=$DOCKER_ROOT -Pdocker-tag=latest
+docker push $DOCKER_ROOT/beam_java_sdk:latest
+```
+
+For portable Flink/Spark in local mode, instead of above command just build the
+Java SDK harness container locally using the default values for repository root
+and the docker tag.
+
+Activate your Python virtual environment.  This example uses `venv`. See 
+[here](https://cwiki.apache.org/confluence/display/BEAM/Python+Tips) for
+instructions regarding setting up other types of Python virtual environments.
+
+```sh
+cd ..  # Creating the virtual environment in the top level work directory.
+python -m venv env
+source env/bin/activate
+```
+
+Install Beam and dependencies and build a Beam distribution.
+
+```sh
+cd beam/sdks/python
+pip install -r build-requirements.txt
+pip install -e '.[gcp]'
+python setup.py sdist
+```
+
+Run the Beam pipeline. You can either use the default Kafka topic name or specify 
+a Kafka topic name. Following command assumes Dataflow. See 
+[here](https://beam.apache.org/get-started/quickstart-py/) for instructions on 
+running Beam Python programs on other runners.
+
+See [here](https://cloud.google.com/dataflow/docs/concepts/regional-endpoints) 
+for more details regarding the selecting a GCP region for Dataflow.
+
+```sh
+export PROJECT="$(gcloud config get-value project)"
+export TEMP_LOCATION="gs://MY-BUCKET/temp"
+export REGION="us-central1" 
+export JOB_NAME="kafka-taxi-`date +%Y%m%d-%H%M%S`"
+export export NUM_WORKERS="5"

Review comment:
       Typo: extra `export`

##########
File path: sdks/python/apache_beam/examples/kafkataxi/kafka_taxi.py
##########
@@ -0,0 +1,87 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""An example that writes to and reads from Kafka.
+
+ This example reads from the PubSub NYC Taxi stream described in
+ https://github.com/googlecodelabs/cloud-dataflow-nyc-taxi-tycoon, writes to a
+ given Kafka topic and reads back from the same Kafka topic.
+ """
+
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import argparse
+import logging
+import typing
+
+import apache_beam as beam
+from apache_beam.io.kafka import ReadFromKafka
+from apache_beam.io.kafka import WriteToKafka
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import SetupOptions
+from apache_beam.options.pipeline_options import StandardOptions
+
+
+def run(argv=None):
+  """Main entry point; defines and runs the wordcount pipeline."""
+  parser = argparse.ArgumentParser()
+  parser.add_argument(
+      '--bootstrap_servers',

Review comment:
       Got it, sounds good to me.

##########
File path: sdks/python/apache_beam/examples/kafkataxi/kafka_taxi.py
##########
@@ -0,0 +1,105 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""An example that writes to and reads from Kafka.
+
+ This example reads from the PubSub NYC Taxi stream described in
+ https://github.com/googlecodelabs/cloud-dataflow-nyc-taxi-tycoon, writes to a
+ given Kafka topic and reads back from the same Kafka topic.
+ """
+
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import logging
+import typing
+
+import apache_beam as beam
+from apache_beam.io.kafka import ReadFromKafka
+from apache_beam.io.kafka import WriteToKafka
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import SetupOptions
+from apache_beam.options.pipeline_options import StandardOptions
+
+
+def run(bootstrap_servers, topic, pipeline_args):
+  # bootstrap_servers = '...'

Review comment:
       Nit: for consistency with the README it would be nice to use the same "123.45.67.89:123:9092" example :)

##########
File path: sdks/python/apache_beam/examples/kafkataxi/kafka_taxi.py
##########
@@ -0,0 +1,105 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""An example that writes to and reads from Kafka.
+
+ This example reads from the PubSub NYC Taxi stream described in
+ https://github.com/googlecodelabs/cloud-dataflow-nyc-taxi-tycoon, writes to a
+ given Kafka topic and reads back from the same Kafka topic.
+ """
+
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import logging
+import typing
+
+import apache_beam as beam
+from apache_beam.io.kafka import ReadFromKafka
+from apache_beam.io.kafka import WriteToKafka
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import SetupOptions
+from apache_beam.options.pipeline_options import StandardOptions
+
+
+def run(bootstrap_servers, topic, pipeline_args):
+  # bootstrap_servers = '...'
+  # topic = 'kafka_taxirides_realtime'
+  # pipeline_args = ['--project', 'my-project', '--runner', 'DataflowRunner',

Review comment:
       Nit: it would read nicer if every parameter was in its own line, it would only add 2 extra lines
   
   ```py
   # pipeline_args = ['--project', 'my-project',
   #                  '--runner', 'DataflowRunner',
   # ...
   ```

##########
File path: sdks/python/apache_beam/examples/kafkataxi/kafka_taxi.py
##########
@@ -0,0 +1,105 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""An example that writes to and reads from Kafka.
+
+ This example reads from the PubSub NYC Taxi stream described in
+ https://github.com/googlecodelabs/cloud-dataflow-nyc-taxi-tycoon, writes to a
+ given Kafka topic and reads back from the same Kafka topic.
+ """
+
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import logging
+import typing
+
+import apache_beam as beam
+from apache_beam.io.kafka import ReadFromKafka
+from apache_beam.io.kafka import WriteToKafka
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import SetupOptions
+from apache_beam.options.pipeline_options import StandardOptions
+
+
+def run(bootstrap_servers, topic, pipeline_args):
+  # bootstrap_servers = '...'
+  # topic = 'kafka_taxirides_realtime'
+  # pipeline_args = ['--project', 'my-project', '--runner', 'DataflowRunner',
+  #                  '--temp_location', 'my-temp-location',
+  #                  '--region', 'my-region', '--num_workers',
+  #                  'my-num-workers', '--experiments', 'use_runner_v2']
+
+  pipeline_options = PipelineOptions(pipeline_args)
+  pipeline_options.view_as(SetupOptions).save_main_session = True
+  pipeline_options.view_as(StandardOptions).streaming = True
+  window_size = 15  # size of the Window in seconds.
+
+  def log_ride(ride_bytes):
+    # Converting bytes record from Kafka to a dictionary.
+    import ast
+    ride = ast.literal_eval(ride_bytes.decode("UTF-8"))
+    logging.info(
+        'Found ride at latitude %r and longitude %r with %r '
+        'passengers',
+        ride['latitude'],
+        ride['longitude'],
+        ride['passenger_count'])
+
+  with beam.Pipeline(options=pipeline_options) as pipeline:
+    _ = (
+        pipeline
+        | beam.io.ReadFromPubSub(
+            topic='projects/pubsub-public-data/topics/taxirides-realtime').
+        with_output_types(bytes)
+        | beam.Map(lambda x: (b'', x)).with_output_types(
+            typing.Tuple[bytes, bytes])  # Kafka write transforms expects KVs.
+        | beam.WindowInto(beam.window.FixedWindows(window_size))
+        | WriteToKafka(
+            producer_config={'bootstrap.servers': bootstrap_servers},
+            topic=topic))
+
+    _ = (
+        pipeline
+        | ReadFromKafka(
+            consumer_config={'bootstrap.servers': bootstrap_servers},
+            topics=[topic])
+        | beam.FlatMap(lambda kv: log_ride(kv[1])))
+
+    pipeline.run().wait_until_finish()

Review comment:
       When using the [`with` statement](https://docs.python.org/3/reference/compound_stmts.html#with), this line is no longer necessary, it's run by the `__exit__` method of `Pipeline`.

##########
File path: sdks/python/apache_beam/examples/kafkataxi/kafka_taxi.py
##########
@@ -0,0 +1,105 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""An example that writes to and reads from Kafka.
+
+ This example reads from the PubSub NYC Taxi stream described in
+ https://github.com/googlecodelabs/cloud-dataflow-nyc-taxi-tycoon, writes to a
+ given Kafka topic and reads back from the same Kafka topic.
+ """
+
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import logging
+import typing
+
+import apache_beam as beam
+from apache_beam.io.kafka import ReadFromKafka
+from apache_beam.io.kafka import WriteToKafka
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import SetupOptions
+from apache_beam.options.pipeline_options import StandardOptions
+
+
+def run(bootstrap_servers, topic, pipeline_args):
+  # bootstrap_servers = '...'
+  # topic = 'kafka_taxirides_realtime'
+  # pipeline_args = ['--project', 'my-project', '--runner', 'DataflowRunner',
+  #                  '--temp_location', 'my-temp-location',
+  #                  '--region', 'my-region', '--num_workers',
+  #                  'my-num-workers', '--experiments', 'use_runner_v2']
+
+  pipeline_options = PipelineOptions(pipeline_args)

Review comment:
       It would be nice to use the [`kwargs` version](https://github.com/apache/beam/pull/11851/files#diff-f1f4eac9f31676456be8094e23107212R247-R253), it's shorter and easier to read/write.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] chamikaramj commented on pull request #12188: [BEAM-10411] Adds an example that use Python cross-language Kafka transforms.

Posted by GitBox <gi...@apache.org>.
chamikaramj commented on pull request #12188:
URL: https://github.com/apache/beam/pull/12188#issuecomment-660243968


   Retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] davidcavazos commented on pull request #12188: [BEAM-10411] Adds an example that use Python cross-language Kafka transforms.

Posted by GitBox <gi...@apache.org>.
davidcavazos commented on pull request #12188:
URL: https://github.com/apache/beam/pull/12188#issuecomment-655640853


   Should we have a test for this sample as well?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] chamikaramj commented on pull request #12188: [BEAM-10411] Adds an example that use Python cross-language Kafka transforms.

Posted by GitBox <gi...@apache.org>.
chamikaramj commented on pull request #12188:
URL: https://github.com/apache/beam/pull/12188#issuecomment-660358023


   Run Python PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] chamikaramj merged pull request #12188: [BEAM-10411] Adds an example that use Python cross-language Kafka transforms.

Posted by GitBox <gi...@apache.org>.
chamikaramj merged pull request #12188:
URL: https://github.com/apache/beam/pull/12188


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] davidcavazos commented on a change in pull request #12188: [BEAM-10411] Adds an example that use Python cross-language Kafka transforms.

Posted by GitBox <gi...@apache.org>.
davidcavazos commented on a change in pull request #12188:
URL: https://github.com/apache/beam/pull/12188#discussion_r451817171



##########
File path: sdks/python/apache_beam/examples/kafkataxi/README.md
##########
@@ -0,0 +1,164 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+
+# Python KafkaIO Example
+
+This example reads from the PubSub NYC Taxi stream described
+[here](https://github.com/googlecodelabs/cloud-dataflow-nyc-taxi-tycoon), writes
+to a given Kafka topic and reads back from the same Kafka topic. This example
+uses cross-language transforms available in
+[kafka.py](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/kafka.py).
+Transforms are implemented in Java and are available
+[here](https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java).
+
+## Prerequisites
+
+Install Java in your system and make sure that `java` command is available in 
+the environment.
+
+```sh
+>java --version

Review comment:
       Can we get rid of the initial `>` and the "output"? It makes it trickier to copy-paste the command.
   
   ```sh
   java --version
   ```

##########
File path: sdks/python/apache_beam/examples/kafkataxi/README.md
##########
@@ -0,0 +1,164 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+
+# Python KafkaIO Example
+
+This example reads from the PubSub NYC Taxi stream described
+[here](https://github.com/googlecodelabs/cloud-dataflow-nyc-taxi-tycoon), writes
+to a given Kafka topic and reads back from the same Kafka topic. This example
+uses cross-language transforms available in
+[kafka.py](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/kafka.py).
+Transforms are implemented in Java and are available
+[here](https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java).
+
+## Prerequisites
+
+Install Java in your system and make sure that `java` command is available in 
+the environment.
+
+```sh
+>java --version
+> <Should print information regarding the installed Java version>
+```
+
+## Setup the Kafka cluster
+
+This example requires users to setup a Kafka cluster that the Beam runner
+executing the pipeline has access to. There are few options.
+
+* For local runners that execute the pipelines in a single computer (for 
+example, portable DirectRunner or Spark/Flink runners in local mode), you can 
+setup a local Kafka cluster running in the same computer.
+* For Dataflow or portable Spark/Flink in distributed mode, you can setup a Kafka 
+cluster in GCE. See 
+[here](https://github.com/GoogleCloudPlatform/java-docs-samples/tree/master/dataflow/flex-templates/kafka_to_bigquery) 
+for step by step instructions for this.
+
+Let's assume that that IP address of the node running the Kafka cluster to be 
+`KAFKA_ADDRESS` and the port to be `9092`.
+
+```sh
+> export BOOTSTRAP_SERVER=KAFKA_ADDRESS:9092

Review comment:
       Can we get rid of the `>` as well?
   
   For this command example, we could assume it's running locally. Someone not familiar with IP addresses might not know what `KAFKA_ADDRESS` means. We could mention that if you're running it in a distributed environment you'll need to replace the address with its public IP address.
   
   ```sh
   export BOOTSTRAP_SERVER=127.0.0.1:9092
   ```
   
   > **[edit]**: after looking below, it looks like this guide's instructions are only written in Dataflow. If that's the case, running Kafka locally won't work since Dataflow needs to reach the Kafka address. Should we take out any mention of running it locally and just assume users will run it in a distributed environment?

##########
File path: sdks/python/apache_beam/examples/kafkataxi/README.md
##########
@@ -0,0 +1,164 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+
+# Python KafkaIO Example
+
+This example reads from the PubSub NYC Taxi stream described
+[here](https://github.com/googlecodelabs/cloud-dataflow-nyc-taxi-tycoon), writes
+to a given Kafka topic and reads back from the same Kafka topic. This example
+uses cross-language transforms available in
+[kafka.py](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/kafka.py).
+Transforms are implemented in Java and are available
+[here](https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java).
+
+## Prerequisites
+
+Install Java in your system and make sure that `java` command is available in 

Review comment:
       Can we link to https://adoptopenjdk.net/?variant=openjdk11&jvmVariant=openj9 for installing Java?

##########
File path: sdks/python/apache_beam/examples/kafkataxi/README.md
##########
@@ -0,0 +1,164 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+
+# Python KafkaIO Example
+
+This example reads from the PubSub NYC Taxi stream described
+[here](https://github.com/googlecodelabs/cloud-dataflow-nyc-taxi-tycoon), writes
+to a given Kafka topic and reads back from the same Kafka topic. This example
+uses cross-language transforms available in
+[kafka.py](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/kafka.py).
+Transforms are implemented in Java and are available
+[here](https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java).
+
+## Prerequisites
+
+Install Java in your system and make sure that `java` command is available in 
+the environment.
+
+```sh
+>java --version
+> <Should print information regarding the installed Java version>
+```

Review comment:
       Do we need anything besides java like maven or gradle?

##########
File path: sdks/python/apache_beam/examples/kafkataxi/README.md
##########
@@ -0,0 +1,164 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+
+# Python KafkaIO Example
+
+This example reads from the PubSub NYC Taxi stream described
+[here](https://github.com/googlecodelabs/cloud-dataflow-nyc-taxi-tycoon), writes
+to a given Kafka topic and reads back from the same Kafka topic. This example
+uses cross-language transforms available in
+[kafka.py](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/kafka.py).
+Transforms are implemented in Java and are available
+[here](https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java).
+
+## Prerequisites
+
+Install Java in your system and make sure that `java` command is available in 
+the environment.
+
+```sh
+>java --version
+> <Should print information regarding the installed Java version>
+```
+
+## Setup the Kafka cluster
+
+This example requires users to setup a Kafka cluster that the Beam runner
+executing the pipeline has access to. There are few options.
+
+* For local runners that execute the pipelines in a single computer (for 
+example, portable DirectRunner or Spark/Flink runners in local mode), you can 
+setup a local Kafka cluster running in the same computer.
+* For Dataflow or portable Spark/Flink in distributed mode, you can setup a Kafka 
+cluster in GCE. See 
+[here](https://github.com/GoogleCloudPlatform/java-docs-samples/tree/master/dataflow/flex-templates/kafka_to_bigquery) 
+for step by step instructions for this.
+
+Let's assume that that IP address of the node running the Kafka cluster to be 
+`KAFKA_ADDRESS` and the port to be `9092`.
+
+```sh
+> export BOOTSTRAP_SERVER=KAFKA_ADDRESS:9092
+```
+
+## Running the example on latest released Beam version
+
+Perform Beam runner specific setup. Note that cross-language transforms require 
+portable implementations of Spark/Flink/Direct runners. Dataflow requires
+(runner V2)[https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#dataflow-runner-v2]
+and Beam 2.22.0 or later. See
+[here](https://beam.apache.org/documentation/runners/dataflow/) for 
+instructions for setting up Dataflow.
+
+Setup a virtual environment for running Beam Python programs. See 
+[here](https://beam.apache.org/get-started/quickstart-py/) for prerequisites. 
+Dataflow requires the `gcp` tag. Assuming your work directory to be
+`/path/to/work`.
+
+```sh
+> export WORK_DIRECTORY=/path/to/work
+> cd $WORK_DIRECTORY
+> mkdir kafka_env
+> virtualenv kafka_env
+> . kafka_env/bin/activate
+> pip install -e .[gcp]
+```
+
+Run the Beam pipeline. You can either use the default Kafka topic name or 
+specify a Kafka topic name. Following command assumes Dataflow. See 
+[here](https://beam.apache.org/get-started/quickstart-py/) for instructions on 
+running Beam Python programs on other runners. Note that you have to fill in 
+information mentioned within angle brackets (`<` and `>`).
+
+Note that this exemple is not available in Beam versions before 2.24.0 hence
+you'll have to either get the example program from Beam or follow steps
+provided in the section *Running the Example from a Beam Git Clone*.
+
+```sh
+>  python -m apache_beam.examples.kafkataxi.kafka_taxi --runner DataflowRunner --temp_location <GCS temp location> --project <GCP project>  --region <region> \

Review comment:
       Can we get rid of the `>` from here as well?
   
   I would highly suggest breaking this command into separate lines for readability.
   
   It would also be a good idea to set variables beforehand so that once they set those variables, they can copy-paste the rest of the command.
   
   I would link to the [Dataflow regional endpoints docs](https://cloud.google.com/dataflow/docs/concepts/regional-endpoints) for more information about the `--region`.
   
   ```sh
   PROJECT="$(gcloud config get-value project)"
   TEMP_LOCATION="gs://MY-BUCKET/temp"
   REGION="us-central1"
   JOB_NAME="kafka_taxi-`date +%Y%m%d-%H%M%S`"
   BOOTSTRAP_SERVER="123.45.67.89:123"
   
   python -m apache_beam.examples.kafkataxi.kafka_taxi \
     --runner "DataflowRunner" \
     --project "$PROJECT" \
     --temp_location "$TEMP_LOCATION" \
     --region "$REGION" \
     --num_workers 1 \
     --job_name "$JOB_NAME" \
     --bootstrap_servers "$BOOTSTRAP_SERVER" \
     --experiments "use_runner_v2"
   ```

##########
File path: sdks/python/apache_beam/examples/kafkataxi/README.md
##########
@@ -0,0 +1,164 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+
+# Python KafkaIO Example
+
+This example reads from the PubSub NYC Taxi stream described
+[here](https://github.com/googlecodelabs/cloud-dataflow-nyc-taxi-tycoon), writes
+to a given Kafka topic and reads back from the same Kafka topic. This example
+uses cross-language transforms available in
+[kafka.py](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/kafka.py).
+Transforms are implemented in Java and are available
+[here](https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java).
+
+## Prerequisites
+
+Install Java in your system and make sure that `java` command is available in 
+the environment.
+
+```sh
+>java --version
+> <Should print information regarding the installed Java version>
+```
+
+## Setup the Kafka cluster
+
+This example requires users to setup a Kafka cluster that the Beam runner
+executing the pipeline has access to. There are few options.
+
+* For local runners that execute the pipelines in a single computer (for 
+example, portable DirectRunner or Spark/Flink runners in local mode), you can 
+setup a local Kafka cluster running in the same computer.
+* For Dataflow or portable Spark/Flink in distributed mode, you can setup a Kafka 
+cluster in GCE. See 
+[here](https://github.com/GoogleCloudPlatform/java-docs-samples/tree/master/dataflow/flex-templates/kafka_to_bigquery) 
+for step by step instructions for this.
+
+Let's assume that that IP address of the node running the Kafka cluster to be 
+`KAFKA_ADDRESS` and the port to be `9092`.
+
+```sh
+> export BOOTSTRAP_SERVER=KAFKA_ADDRESS:9092
+```
+
+## Running the example on latest released Beam version
+
+Perform Beam runner specific setup. Note that cross-language transforms require 
+portable implementations of Spark/Flink/Direct runners. Dataflow requires
+(runner V2)[https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#dataflow-runner-v2]
+and Beam 2.22.0 or later. See
+[here](https://beam.apache.org/documentation/runners/dataflow/) for 
+instructions for setting up Dataflow.
+
+Setup a virtual environment for running Beam Python programs. See 
+[here](https://beam.apache.org/get-started/quickstart-py/) for prerequisites. 
+Dataflow requires the `gcp` tag. Assuming your work directory to be
+`/path/to/work`.
+
+```sh
+> export WORK_DIRECTORY=/path/to/work
+> cd $WORK_DIRECTORY
+> mkdir kafka_env
+> virtualenv kafka_env
+> . kafka_env/bin/activate
+> pip install -e .[gcp]
+```
+
+Run the Beam pipeline. You can either use the default Kafka topic name or 
+specify a Kafka topic name. Following command assumes Dataflow. See 
+[here](https://beam.apache.org/get-started/quickstart-py/) for instructions on 
+running Beam Python programs on other runners. Note that you have to fill in 
+information mentioned within angle brackets (`<` and `>`).
+
+Note that this exemple is not available in Beam versions before 2.24.0 hence
+you'll have to either get the example program from Beam or follow steps
+provided in the section *Running the Example from a Beam Git Clone*.
+
+```sh
+>  python -m apache_beam.examples.kafkataxi.kafka_taxi --runner DataflowRunner --temp_location <GCS temp location> --project <GCP project>  --region <region> \
+   --num_workers 1 --job_name <job name> --bootstrap_servers $BOOTSTRAP_SERVER --experiments=use_runner_v2
+```
+
+## Running the Example from a Beam Git Clone
+
+Running this example from a Beam Git clone requires some additional steps.
+
+Checkout a clone of the Beam Git repo. See 
+[here](https://beam.apache.org/contribute/) for prerequisites.
+
+Assume your Github username to be `GITHUB_USERNAME`.
+
+```sh
+> cd $WORK_DIRECTORY
+> git clone git@github.com:${GITHUB_USERNAME}/beam
+> cd beam
+```
+
+Build IO expansion service jar.
+
+```sh
+> ./gradlew :sdks:java:io:expansion-service:build
+```
+
+Push a java SDK Harness container to Docker Hub. See 
+[here](https://beam.apache.org/documentation/runtime/environments/) for 
+prerequisites and additional information. Note that you have to fill in 
+information mentioned within angle brackets (`<` and `>`). Assume your Docker
+repository root to be `<Docker repository root>`.
+
+```sh
+> export DOCKER_ROOT=<Docker repository root>

Review comment:
       Can we get rid of the `>` from these as well?

##########
File path: sdks/python/apache_beam/examples/kafkataxi/README.md
##########
@@ -0,0 +1,164 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+
+# Python KafkaIO Example
+
+This example reads from the PubSub NYC Taxi stream described
+[here](https://github.com/googlecodelabs/cloud-dataflow-nyc-taxi-tycoon), writes
+to a given Kafka topic and reads back from the same Kafka topic. This example
+uses cross-language transforms available in
+[kafka.py](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/kafka.py).
+Transforms are implemented in Java and are available
+[here](https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java).
+
+## Prerequisites
+
+Install Java in your system and make sure that `java` command is available in 
+the environment.
+
+```sh
+>java --version
+> <Should print information regarding the installed Java version>
+```
+
+## Setup the Kafka cluster
+
+This example requires users to setup a Kafka cluster that the Beam runner
+executing the pipeline has access to. There are few options.
+
+* For local runners that execute the pipelines in a single computer (for 
+example, portable DirectRunner or Spark/Flink runners in local mode), you can 
+setup a local Kafka cluster running in the same computer.
+* For Dataflow or portable Spark/Flink in distributed mode, you can setup a Kafka 
+cluster in GCE. See 
+[here](https://github.com/GoogleCloudPlatform/java-docs-samples/tree/master/dataflow/flex-templates/kafka_to_bigquery) 
+for step by step instructions for this.
+
+Let's assume that that IP address of the node running the Kafka cluster to be 
+`KAFKA_ADDRESS` and the port to be `9092`.
+
+```sh
+> export BOOTSTRAP_SERVER=KAFKA_ADDRESS:9092
+```
+
+## Running the example on latest released Beam version
+
+Perform Beam runner specific setup. Note that cross-language transforms require 
+portable implementations of Spark/Flink/Direct runners. Dataflow requires
+(runner V2)[https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#dataflow-runner-v2]

Review comment:
       Typo on link? `(short name)[url]` -> `[url](short name)`

##########
File path: sdks/python/apache_beam/examples/kafkataxi/README.md
##########
@@ -0,0 +1,164 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+
+# Python KafkaIO Example
+
+This example reads from the PubSub NYC Taxi stream described
+[here](https://github.com/googlecodelabs/cloud-dataflow-nyc-taxi-tycoon), writes
+to a given Kafka topic and reads back from the same Kafka topic. This example
+uses cross-language transforms available in
+[kafka.py](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/kafka.py).
+Transforms are implemented in Java and are available
+[here](https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java).
+
+## Prerequisites
+
+Install Java in your system and make sure that `java` command is available in 
+the environment.
+
+```sh
+>java --version
+> <Should print information regarding the installed Java version>
+```
+
+## Setup the Kafka cluster
+
+This example requires users to setup a Kafka cluster that the Beam runner
+executing the pipeline has access to. There are few options.
+
+* For local runners that execute the pipelines in a single computer (for 
+example, portable DirectRunner or Spark/Flink runners in local mode), you can 
+setup a local Kafka cluster running in the same computer.
+* For Dataflow or portable Spark/Flink in distributed mode, you can setup a Kafka 
+cluster in GCE. See 
+[here](https://github.com/GoogleCloudPlatform/java-docs-samples/tree/master/dataflow/flex-templates/kafka_to_bigquery) 
+for step by step instructions for this.
+
+Let's assume that that IP address of the node running the Kafka cluster to be 
+`KAFKA_ADDRESS` and the port to be `9092`.
+
+```sh
+> export BOOTSTRAP_SERVER=KAFKA_ADDRESS:9092
+```
+
+## Running the example on latest released Beam version
+
+Perform Beam runner specific setup. Note that cross-language transforms require 
+portable implementations of Spark/Flink/Direct runners. Dataflow requires
+(runner V2)[https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#dataflow-runner-v2]
+and Beam 2.22.0 or later. See
+[here](https://beam.apache.org/documentation/runners/dataflow/) for 
+instructions for setting up Dataflow.
+
+Setup a virtual environment for running Beam Python programs. See 
+[here](https://beam.apache.org/get-started/quickstart-py/) for prerequisites. 
+Dataflow requires the `gcp` tag. Assuming your work directory to be
+`/path/to/work`.
+
+```sh
+> export WORK_DIRECTORY=/path/to/work
+> cd $WORK_DIRECTORY
+> mkdir kafka_env
+> virtualenv kafka_env
+> . kafka_env/bin/activate
+> pip install -e .[gcp]
+```
+
+Run the Beam pipeline. You can either use the default Kafka topic name or 
+specify a Kafka topic name. Following command assumes Dataflow. See 
+[here](https://beam.apache.org/get-started/quickstart-py/) for instructions on 
+running Beam Python programs on other runners. Note that you have to fill in 
+information mentioned within angle brackets (`<` and `>`).
+
+Note that this exemple is not available in Beam versions before 2.24.0 hence
+you'll have to either get the example program from Beam or follow steps
+provided in the section *Running the Example from a Beam Git Clone*.
+
+```sh
+>  python -m apache_beam.examples.kafkataxi.kafka_taxi --runner DataflowRunner --temp_location <GCS temp location> --project <GCP project>  --region <region> \
+   --num_workers 1 --job_name <job name> --bootstrap_servers $BOOTSTRAP_SERVER --experiments=use_runner_v2
+```
+
+## Running the Example from a Beam Git Clone

Review comment:
       I would put an `(Optional)` note at the beginning of the header to be explicit that it's safe to skip this section if they're not interested in running from a git clone.
   
   ```md
   ## *(Optional)* Running the Example from a Beam Git Clone
   ```

##########
File path: sdks/python/apache_beam/examples/kafkataxi/README.md
##########
@@ -0,0 +1,164 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+
+# Python KafkaIO Example
+
+This example reads from the PubSub NYC Taxi stream described
+[here](https://github.com/googlecodelabs/cloud-dataflow-nyc-taxi-tycoon), writes
+to a given Kafka topic and reads back from the same Kafka topic. This example
+uses cross-language transforms available in
+[kafka.py](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/kafka.py).
+Transforms are implemented in Java and are available
+[here](https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java).
+
+## Prerequisites
+
+Install Java in your system and make sure that `java` command is available in 
+the environment.
+
+```sh
+>java --version
+> <Should print information regarding the installed Java version>
+```
+
+## Setup the Kafka cluster
+
+This example requires users to setup a Kafka cluster that the Beam runner
+executing the pipeline has access to. There are few options.
+
+* For local runners that execute the pipelines in a single computer (for 
+example, portable DirectRunner or Spark/Flink runners in local mode), you can 
+setup a local Kafka cluster running in the same computer.
+* For Dataflow or portable Spark/Flink in distributed mode, you can setup a Kafka 
+cluster in GCE. See 
+[here](https://github.com/GoogleCloudPlatform/java-docs-samples/tree/master/dataflow/flex-templates/kafka_to_bigquery) 
+for step by step instructions for this.
+
+Let's assume that that IP address of the node running the Kafka cluster to be 
+`KAFKA_ADDRESS` and the port to be `9092`.
+
+```sh
+> export BOOTSTRAP_SERVER=KAFKA_ADDRESS:9092
+```
+
+## Running the example on latest released Beam version
+
+Perform Beam runner specific setup. Note that cross-language transforms require 
+portable implementations of Spark/Flink/Direct runners. Dataflow requires
+(runner V2)[https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#dataflow-runner-v2]
+and Beam 2.22.0 or later. See
+[here](https://beam.apache.org/documentation/runners/dataflow/) for 
+instructions for setting up Dataflow.
+
+Setup a virtual environment for running Beam Python programs. See 
+[here](https://beam.apache.org/get-started/quickstart-py/) for prerequisites. 
+Dataflow requires the `gcp` tag. Assuming your work directory to be
+`/path/to/work`.
+
+```sh
+> export WORK_DIRECTORY=/path/to/work
+> cd $WORK_DIRECTORY
+> mkdir kafka_env
+> virtualenv kafka_env
+> . kafka_env/bin/activate
+> pip install -e .[gcp]
+```
+
+Run the Beam pipeline. You can either use the default Kafka topic name or 
+specify a Kafka topic name. Following command assumes Dataflow. See 
+[here](https://beam.apache.org/get-started/quickstart-py/) for instructions on 
+running Beam Python programs on other runners. Note that you have to fill in 
+information mentioned within angle brackets (`<` and `>`).
+
+Note that this exemple is not available in Beam versions before 2.24.0 hence
+you'll have to either get the example program from Beam or follow steps
+provided in the section *Running the Example from a Beam Git Clone*.
+
+```sh
+>  python -m apache_beam.examples.kafkataxi.kafka_taxi --runner DataflowRunner --temp_location <GCS temp location> --project <GCP project>  --region <region> \
+   --num_workers 1 --job_name <job name> --bootstrap_servers $BOOTSTRAP_SERVER --experiments=use_runner_v2
+```
+
+## Running the Example from a Beam Git Clone
+
+Running this example from a Beam Git clone requires some additional steps.
+
+Checkout a clone of the Beam Git repo. See 
+[here](https://beam.apache.org/contribute/) for prerequisites.
+
+Assume your Github username to be `GITHUB_USERNAME`.
+
+```sh
+> cd $WORK_DIRECTORY
+> git clone git@github.com:${GITHUB_USERNAME}/beam
+> cd beam
+```
+
+Build IO expansion service jar.
+
+```sh
+> ./gradlew :sdks:java:io:expansion-service:build

Review comment:
       Can we get rid of the `>` from these as well?

##########
File path: sdks/python/apache_beam/examples/kafkataxi/README.md
##########
@@ -0,0 +1,164 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+
+# Python KafkaIO Example
+
+This example reads from the PubSub NYC Taxi stream described
+[here](https://github.com/googlecodelabs/cloud-dataflow-nyc-taxi-tycoon), writes
+to a given Kafka topic and reads back from the same Kafka topic. This example
+uses cross-language transforms available in
+[kafka.py](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/kafka.py).
+Transforms are implemented in Java and are available
+[here](https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java).
+
+## Prerequisites
+
+Install Java in your system and make sure that `java` command is available in 
+the environment.
+
+```sh
+>java --version
+> <Should print information regarding the installed Java version>
+```
+
+## Setup the Kafka cluster
+
+This example requires users to setup a Kafka cluster that the Beam runner
+executing the pipeline has access to. There are few options.
+
+* For local runners that execute the pipelines in a single computer (for 
+example, portable DirectRunner or Spark/Flink runners in local mode), you can 
+setup a local Kafka cluster running in the same computer.
+* For Dataflow or portable Spark/Flink in distributed mode, you can setup a Kafka 
+cluster in GCE. See 
+[here](https://github.com/GoogleCloudPlatform/java-docs-samples/tree/master/dataflow/flex-templates/kafka_to_bigquery) 
+for step by step instructions for this.
+
+Let's assume that that IP address of the node running the Kafka cluster to be 
+`KAFKA_ADDRESS` and the port to be `9092`.
+
+```sh
+> export BOOTSTRAP_SERVER=KAFKA_ADDRESS:9092
+```
+
+## Running the example on latest released Beam version
+
+Perform Beam runner specific setup. Note that cross-language transforms require 
+portable implementations of Spark/Flink/Direct runners. Dataflow requires
+(runner V2)[https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#dataflow-runner-v2]
+and Beam 2.22.0 or later. See
+[here](https://beam.apache.org/documentation/runners/dataflow/) for 
+instructions for setting up Dataflow.
+
+Setup a virtual environment for running Beam Python programs. See 
+[here](https://beam.apache.org/get-started/quickstart-py/) for prerequisites. 
+Dataflow requires the `gcp` tag. Assuming your work directory to be
+`/path/to/work`.
+
+```sh
+> export WORK_DIRECTORY=/path/to/work

Review comment:
       Can we remove the `>` from here as well?
   
   In general, we can assume users will run these commands in their working directory of choice, so I would say we can safely omit the `export WORK_DIRECTORY` and `cd $WORK_DIRECTORY` instructions.

##########
File path: sdks/python/apache_beam/examples/kafkataxi/README.md
##########
@@ -0,0 +1,164 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+
+# Python KafkaIO Example
+
+This example reads from the PubSub NYC Taxi stream described
+[here](https://github.com/googlecodelabs/cloud-dataflow-nyc-taxi-tycoon), writes
+to a given Kafka topic and reads back from the same Kafka topic. This example
+uses cross-language transforms available in
+[kafka.py](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/kafka.py).
+Transforms are implemented in Java and are available
+[here](https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java).
+
+## Prerequisites
+
+Install Java in your system and make sure that `java` command is available in 
+the environment.
+
+```sh
+>java --version
+> <Should print information regarding the installed Java version>
+```
+
+## Setup the Kafka cluster
+
+This example requires users to setup a Kafka cluster that the Beam runner
+executing the pipeline has access to. There are few options.
+
+* For local runners that execute the pipelines in a single computer (for 
+example, portable DirectRunner or Spark/Flink runners in local mode), you can 
+setup a local Kafka cluster running in the same computer.
+* For Dataflow or portable Spark/Flink in distributed mode, you can setup a Kafka 
+cluster in GCE. See 
+[here](https://github.com/GoogleCloudPlatform/java-docs-samples/tree/master/dataflow/flex-templates/kafka_to_bigquery) 
+for step by step instructions for this.
+
+Let's assume that that IP address of the node running the Kafka cluster to be 
+`KAFKA_ADDRESS` and the port to be `9092`.
+
+```sh
+> export BOOTSTRAP_SERVER=KAFKA_ADDRESS:9092
+```
+
+## Running the example on latest released Beam version
+
+Perform Beam runner specific setup. Note that cross-language transforms require 
+portable implementations of Spark/Flink/Direct runners. Dataflow requires
+(runner V2)[https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#dataflow-runner-v2]
+and Beam 2.22.0 or later. See
+[here](https://beam.apache.org/documentation/runners/dataflow/) for 
+instructions for setting up Dataflow.
+
+Setup a virtual environment for running Beam Python programs. See 
+[here](https://beam.apache.org/get-started/quickstart-py/) for prerequisites. 
+Dataflow requires the `gcp` tag. Assuming your work directory to be
+`/path/to/work`.
+
+```sh
+> export WORK_DIRECTORY=/path/to/work
+> cd $WORK_DIRECTORY
+> mkdir kafka_env
+> virtualenv kafka_env
+> . kafka_env/bin/activate
+> pip install -e .[gcp]
+```
+
+Run the Beam pipeline. You can either use the default Kafka topic name or 
+specify a Kafka topic name. Following command assumes Dataflow. See 
+[here](https://beam.apache.org/get-started/quickstart-py/) for instructions on 
+running Beam Python programs on other runners. Note that you have to fill in 
+information mentioned within angle brackets (`<` and `>`).
+
+Note that this exemple is not available in Beam versions before 2.24.0 hence
+you'll have to either get the example program from Beam or follow steps
+provided in the section *Running the Example from a Beam Git Clone*.
+
+```sh
+>  python -m apache_beam.examples.kafkataxi.kafka_taxi --runner DataflowRunner --temp_location <GCS temp location> --project <GCP project>  --region <region> \
+   --num_workers 1 --job_name <job name> --bootstrap_servers $BOOTSTRAP_SERVER --experiments=use_runner_v2
+```
+
+## Running the Example from a Beam Git Clone
+
+Running this example from a Beam Git clone requires some additional steps.
+
+Checkout a clone of the Beam Git repo. See 
+[here](https://beam.apache.org/contribute/) for prerequisites.
+
+Assume your Github username to be `GITHUB_USERNAME`.
+
+```sh
+> cd $WORK_DIRECTORY

Review comment:
       Can we get rid of the `>` from these as well?
   
   I think it's also safe to skip the `WORK_DIRECTORY` step.

##########
File path: sdks/python/apache_beam/examples/kafkataxi/README.md
##########
@@ -0,0 +1,164 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+
+# Python KafkaIO Example
+
+This example reads from the PubSub NYC Taxi stream described
+[here](https://github.com/googlecodelabs/cloud-dataflow-nyc-taxi-tycoon), writes
+to a given Kafka topic and reads back from the same Kafka topic. This example
+uses cross-language transforms available in
+[kafka.py](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/kafka.py).
+Transforms are implemented in Java and are available
+[here](https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java).
+
+## Prerequisites
+
+Install Java in your system and make sure that `java` command is available in 
+the environment.
+
+```sh
+>java --version
+> <Should print information regarding the installed Java version>
+```
+
+## Setup the Kafka cluster
+
+This example requires users to setup a Kafka cluster that the Beam runner
+executing the pipeline has access to. There are few options.
+
+* For local runners that execute the pipelines in a single computer (for 

Review comment:
       Can we link to http://kafka.apache.org/quickstart ?

##########
File path: sdks/python/apache_beam/examples/kafkataxi/README.md
##########
@@ -0,0 +1,164 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+
+# Python KafkaIO Example
+
+This example reads from the PubSub NYC Taxi stream described
+[here](https://github.com/googlecodelabs/cloud-dataflow-nyc-taxi-tycoon), writes
+to a given Kafka topic and reads back from the same Kafka topic. This example
+uses cross-language transforms available in
+[kafka.py](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/kafka.py).
+Transforms are implemented in Java and are available
+[here](https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java).
+
+## Prerequisites
+
+Install Java in your system and make sure that `java` command is available in 
+the environment.
+
+```sh
+>java --version
+> <Should print information regarding the installed Java version>
+```
+
+## Setup the Kafka cluster
+
+This example requires users to setup a Kafka cluster that the Beam runner
+executing the pipeline has access to. There are few options.
+
+* For local runners that execute the pipelines in a single computer (for 
+example, portable DirectRunner or Spark/Flink runners in local mode), you can 
+setup a local Kafka cluster running in the same computer.
+* For Dataflow or portable Spark/Flink in distributed mode, you can setup a Kafka 
+cluster in GCE. See 
+[here](https://github.com/GoogleCloudPlatform/java-docs-samples/tree/master/dataflow/flex-templates/kafka_to_bigquery) 
+for step by step instructions for this.
+
+Let's assume that that IP address of the node running the Kafka cluster to be 
+`KAFKA_ADDRESS` and the port to be `9092`.
+
+```sh
+> export BOOTSTRAP_SERVER=KAFKA_ADDRESS:9092
+```
+
+## Running the example on latest released Beam version
+
+Perform Beam runner specific setup. Note that cross-language transforms require 
+portable implementations of Spark/Flink/Direct runners. Dataflow requires
+(runner V2)[https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#dataflow-runner-v2]
+and Beam 2.22.0 or later. See
+[here](https://beam.apache.org/documentation/runners/dataflow/) for 
+instructions for setting up Dataflow.
+
+Setup a virtual environment for running Beam Python programs. See 
+[here](https://beam.apache.org/get-started/quickstart-py/) for prerequisites. 
+Dataflow requires the `gcp` tag. Assuming your work directory to be
+`/path/to/work`.
+
+```sh
+> export WORK_DIRECTORY=/path/to/work
+> cd $WORK_DIRECTORY
+> mkdir kafka_env
+> virtualenv kafka_env
+> . kafka_env/bin/activate
+> pip install -e .[gcp]
+```
+
+Run the Beam pipeline. You can either use the default Kafka topic name or 
+specify a Kafka topic name. Following command assumes Dataflow. See 
+[here](https://beam.apache.org/get-started/quickstart-py/) for instructions on 
+running Beam Python programs on other runners. Note that you have to fill in 
+information mentioned within angle brackets (`<` and `>`).
+
+Note that this exemple is not available in Beam versions before 2.24.0 hence
+you'll have to either get the example program from Beam or follow steps
+provided in the section *Running the Example from a Beam Git Clone*.
+
+```sh
+>  python -m apache_beam.examples.kafkataxi.kafka_taxi --runner DataflowRunner --temp_location <GCS temp location> --project <GCP project>  --region <region> \
+   --num_workers 1 --job_name <job name> --bootstrap_servers $BOOTSTRAP_SERVER --experiments=use_runner_v2
+```
+
+## Running the Example from a Beam Git Clone
+
+Running this example from a Beam Git clone requires some additional steps.
+
+Checkout a clone of the Beam Git repo. See 
+[here](https://beam.apache.org/contribute/) for prerequisites.
+
+Assume your Github username to be `GITHUB_USERNAME`.
+
+```sh
+> cd $WORK_DIRECTORY
+> git clone git@github.com:${GITHUB_USERNAME}/beam
+> cd beam
+```
+
+Build IO expansion service jar.
+
+```sh
+> ./gradlew :sdks:java:io:expansion-service:build
+```
+
+Push a java SDK Harness container to Docker Hub. See 
+[here](https://beam.apache.org/documentation/runtime/environments/) for 
+prerequisites and additional information. Note that you have to fill in 
+information mentioned within angle brackets (`<` and `>`). Assume your Docker
+repository root to be `<Docker repository root>`.
+
+```sh
+> export DOCKER_ROOT=<Docker repository root>
+> ./gradlew :sdks:java:container:docker -Pdocker-repository-root=$DOCKER_ROOT -Pdocker-tag=latest
+> docker push $DOCKER_ROOT/beam_java_sdk:latest
+```
+
+For portable Flink/Spark in local mode, instead of above command just build the
+Java SDK harness container locally using the default values for repository root
+and the docker tag.
+
+Activate your Python virtual environment.  This example uses `virtualenv`. See 
+[here](https://cwiki.apache.org/confluence/display/BEAM/Python+Tips) for
+instructions regarding setting up other types of Python virtual environments.
+
+```sh
+> cd $WORK_DIRECTORY

Review comment:
       Same as the previous section

##########
File path: sdks/python/apache_beam/examples/kafkataxi/README.md
##########
@@ -0,0 +1,164 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+
+# Python KafkaIO Example
+
+This example reads from the PubSub NYC Taxi stream described
+[here](https://github.com/googlecodelabs/cloud-dataflow-nyc-taxi-tycoon), writes
+to a given Kafka topic and reads back from the same Kafka topic. This example
+uses cross-language transforms available in
+[kafka.py](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/kafka.py).
+Transforms are implemented in Java and are available
+[here](https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java).
+
+## Prerequisites
+
+Install Java in your system and make sure that `java` command is available in 
+the environment.
+
+```sh
+>java --version
+> <Should print information regarding the installed Java version>
+```
+
+## Setup the Kafka cluster
+
+This example requires users to setup a Kafka cluster that the Beam runner
+executing the pipeline has access to. There are few options.
+
+* For local runners that execute the pipelines in a single computer (for 
+example, portable DirectRunner or Spark/Flink runners in local mode), you can 
+setup a local Kafka cluster running in the same computer.
+* For Dataflow or portable Spark/Flink in distributed mode, you can setup a Kafka 
+cluster in GCE. See 
+[here](https://github.com/GoogleCloudPlatform/java-docs-samples/tree/master/dataflow/flex-templates/kafka_to_bigquery) 
+for step by step instructions for this.
+
+Let's assume that that IP address of the node running the Kafka cluster to be 
+`KAFKA_ADDRESS` and the port to be `9092`.
+
+```sh
+> export BOOTSTRAP_SERVER=KAFKA_ADDRESS:9092
+```
+
+## Running the example on latest released Beam version
+
+Perform Beam runner specific setup. Note that cross-language transforms require 
+portable implementations of Spark/Flink/Direct runners. Dataflow requires
+(runner V2)[https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#dataflow-runner-v2]
+and Beam 2.22.0 or later. See
+[here](https://beam.apache.org/documentation/runners/dataflow/) for 
+instructions for setting up Dataflow.
+
+Setup a virtual environment for running Beam Python programs. See 
+[here](https://beam.apache.org/get-started/quickstart-py/) for prerequisites. 
+Dataflow requires the `gcp` tag. Assuming your work directory to be
+`/path/to/work`.
+
+```sh
+> export WORK_DIRECTORY=/path/to/work
+> cd $WORK_DIRECTORY
+> mkdir kafka_env
+> virtualenv kafka_env

Review comment:
       As of Python 3, the [recommended way to create virtual environments](https://docs.python.org/3/tutorial/venv.html) is through `venv` which already comes preinstalled with Python.
   
   I would also use the more "standard" name `env` for the virtualenv, it's more familiar and a good assumption for most tools.
   
   In [`zsh`](https://ohmyz.sh/) using `pip install -e .[gcp]` gets interpreted a little bit different, so it's actually a good idea to surround the `.[gcp]` in quotes.
   
   Also, is there any reason to install `-e '.[gcp]'` instead of `-U 'apache-beam[gcp]'`?
   
   ```sh
   python -m venv env
   source env/bin/activate
   pip install -e '.[gcp]'
   ```

##########
File path: sdks/python/apache_beam/examples/kafkataxi/README.md
##########
@@ -0,0 +1,164 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+
+# Python KafkaIO Example
+
+This example reads from the PubSub NYC Taxi stream described
+[here](https://github.com/googlecodelabs/cloud-dataflow-nyc-taxi-tycoon), writes
+to a given Kafka topic and reads back from the same Kafka topic. This example
+uses cross-language transforms available in
+[kafka.py](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/kafka.py).
+Transforms are implemented in Java and are available
+[here](https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java).
+
+## Prerequisites
+
+Install Java in your system and make sure that `java` command is available in 
+the environment.
+
+```sh
+>java --version
+> <Should print information regarding the installed Java version>
+```
+
+## Setup the Kafka cluster
+
+This example requires users to setup a Kafka cluster that the Beam runner
+executing the pipeline has access to. There are few options.
+
+* For local runners that execute the pipelines in a single computer (for 
+example, portable DirectRunner or Spark/Flink runners in local mode), you can 
+setup a local Kafka cluster running in the same computer.
+* For Dataflow or portable Spark/Flink in distributed mode, you can setup a Kafka 
+cluster in GCE. See 
+[here](https://github.com/GoogleCloudPlatform/java-docs-samples/tree/master/dataflow/flex-templates/kafka_to_bigquery) 
+for step by step instructions for this.
+
+Let's assume that that IP address of the node running the Kafka cluster to be 
+`KAFKA_ADDRESS` and the port to be `9092`.
+
+```sh
+> export BOOTSTRAP_SERVER=KAFKA_ADDRESS:9092
+```
+
+## Running the example on latest released Beam version
+
+Perform Beam runner specific setup. Note that cross-language transforms require 
+portable implementations of Spark/Flink/Direct runners. Dataflow requires
+(runner V2)[https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#dataflow-runner-v2]
+and Beam 2.22.0 or later. See
+[here](https://beam.apache.org/documentation/runners/dataflow/) for 
+instructions for setting up Dataflow.
+
+Setup a virtual environment for running Beam Python programs. See 
+[here](https://beam.apache.org/get-started/quickstart-py/) for prerequisites. 
+Dataflow requires the `gcp` tag. Assuming your work directory to be
+`/path/to/work`.
+
+```sh
+> export WORK_DIRECTORY=/path/to/work
+> cd $WORK_DIRECTORY
+> mkdir kafka_env
+> virtualenv kafka_env
+> . kafka_env/bin/activate
+> pip install -e .[gcp]
+```
+
+Run the Beam pipeline. You can either use the default Kafka topic name or 
+specify a Kafka topic name. Following command assumes Dataflow. See 
+[here](https://beam.apache.org/get-started/quickstart-py/) for instructions on 
+running Beam Python programs on other runners. Note that you have to fill in 
+information mentioned within angle brackets (`<` and `>`).
+
+Note that this exemple is not available in Beam versions before 2.24.0 hence
+you'll have to either get the example program from Beam or follow steps
+provided in the section *Running the Example from a Beam Git Clone*.
+
+```sh
+>  python -m apache_beam.examples.kafkataxi.kafka_taxi --runner DataflowRunner --temp_location <GCS temp location> --project <GCP project>  --region <region> \
+   --num_workers 1 --job_name <job name> --bootstrap_servers $BOOTSTRAP_SERVER --experiments=use_runner_v2
+```
+
+## Running the Example from a Beam Git Clone
+
+Running this example from a Beam Git clone requires some additional steps.
+
+Checkout a clone of the Beam Git repo. See 
+[here](https://beam.apache.org/contribute/) for prerequisites.
+
+Assume your Github username to be `GITHUB_USERNAME`.
+
+```sh
+> cd $WORK_DIRECTORY
+> git clone git@github.com:${GITHUB_USERNAME}/beam
+> cd beam
+```
+
+Build IO expansion service jar.
+
+```sh
+> ./gradlew :sdks:java:io:expansion-service:build
+```
+
+Push a java SDK Harness container to Docker Hub. See 
+[here](https://beam.apache.org/documentation/runtime/environments/) for 
+prerequisites and additional information. Note that you have to fill in 
+information mentioned within angle brackets (`<` and `>`). Assume your Docker
+repository root to be `<Docker repository root>`.
+
+```sh
+> export DOCKER_ROOT=<Docker repository root>
+> ./gradlew :sdks:java:container:docker -Pdocker-repository-root=$DOCKER_ROOT -Pdocker-tag=latest
+> docker push $DOCKER_ROOT/beam_java_sdk:latest
+```
+
+For portable Flink/Spark in local mode, instead of above command just build the
+Java SDK harness container locally using the default values for repository root
+and the docker tag.
+
+Activate your Python virtual environment.  This example uses `virtualenv`. See 
+[here](https://cwiki.apache.org/confluence/display/BEAM/Python+Tips) for
+instructions regarding setting up other types of Python virtual environments.
+
+```sh
+> cd $WORK_DIRECTORY
+> mkdir kafka_env
+> virtualenv kafka_env
+> . kafka_env/bin/activate
+```
+
+Install Beam and dependencies and build a Beam distribution.
+
+```sh
+> cd beam/sdks/python
+> pip install -r build-requirements.txt
+> pip install -e .[gcp,test]
+> python setup.py sdist
+```
+
+Run the Beam pipeline. You can either use the default Kafka topic name or specify 
+a Kafka topic name. Following command assumes Dataflow. See 
+[here](https://beam.apache.org/get-started/quickstart-py/) for instructions on 
+running Beam Python programs on other runners.
+
+```sh
+>  python -m apache_beam.examples.kafkataxi.kafka_taxi --runner DataflowRunner --temp_location <GCS temp location> --project <GCP project>  --region <region> \

Review comment:
       Same as the previous section

##########
File path: sdks/python/apache_beam/examples/kafkataxi/README.md
##########
@@ -0,0 +1,164 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+
+# Python KafkaIO Example
+
+This example reads from the PubSub NYC Taxi stream described
+[here](https://github.com/googlecodelabs/cloud-dataflow-nyc-taxi-tycoon), writes
+to a given Kafka topic and reads back from the same Kafka topic. This example
+uses cross-language transforms available in
+[kafka.py](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/kafka.py).
+Transforms are implemented in Java and are available
+[here](https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java).
+
+## Prerequisites
+
+Install Java in your system and make sure that `java` command is available in 
+the environment.
+
+```sh
+>java --version
+> <Should print information regarding the installed Java version>
+```
+
+## Setup the Kafka cluster
+
+This example requires users to setup a Kafka cluster that the Beam runner
+executing the pipeline has access to. There are few options.
+
+* For local runners that execute the pipelines in a single computer (for 
+example, portable DirectRunner or Spark/Flink runners in local mode), you can 
+setup a local Kafka cluster running in the same computer.
+* For Dataflow or portable Spark/Flink in distributed mode, you can setup a Kafka 
+cluster in GCE. See 
+[here](https://github.com/GoogleCloudPlatform/java-docs-samples/tree/master/dataflow/flex-templates/kafka_to_bigquery) 
+for step by step instructions for this.
+
+Let's assume that that IP address of the node running the Kafka cluster to be 
+`KAFKA_ADDRESS` and the port to be `9092`.
+
+```sh
+> export BOOTSTRAP_SERVER=KAFKA_ADDRESS:9092
+```
+
+## Running the example on latest released Beam version
+
+Perform Beam runner specific setup. Note that cross-language transforms require 
+portable implementations of Spark/Flink/Direct runners. Dataflow requires
+(runner V2)[https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#dataflow-runner-v2]
+and Beam 2.22.0 or later. See
+[here](https://beam.apache.org/documentation/runners/dataflow/) for 
+instructions for setting up Dataflow.
+
+Setup a virtual environment for running Beam Python programs. See 
+[here](https://beam.apache.org/get-started/quickstart-py/) for prerequisites. 
+Dataflow requires the `gcp` tag. Assuming your work directory to be
+`/path/to/work`.
+
+```sh
+> export WORK_DIRECTORY=/path/to/work
+> cd $WORK_DIRECTORY
+> mkdir kafka_env
+> virtualenv kafka_env
+> . kafka_env/bin/activate
+> pip install -e .[gcp]
+```
+
+Run the Beam pipeline. You can either use the default Kafka topic name or 
+specify a Kafka topic name. Following command assumes Dataflow. See 
+[here](https://beam.apache.org/get-started/quickstart-py/) for instructions on 
+running Beam Python programs on other runners. Note that you have to fill in 
+information mentioned within angle brackets (`<` and `>`).
+
+Note that this exemple is not available in Beam versions before 2.24.0 hence
+you'll have to either get the example program from Beam or follow steps
+provided in the section *Running the Example from a Beam Git Clone*.
+
+```sh
+>  python -m apache_beam.examples.kafkataxi.kafka_taxi --runner DataflowRunner --temp_location <GCS temp location> --project <GCP project>  --region <region> \
+   --num_workers 1 --job_name <job name> --bootstrap_servers $BOOTSTRAP_SERVER --experiments=use_runner_v2
+```
+
+## Running the Example from a Beam Git Clone
+
+Running this example from a Beam Git clone requires some additional steps.
+
+Checkout a clone of the Beam Git repo. See 
+[here](https://beam.apache.org/contribute/) for prerequisites.
+
+Assume your Github username to be `GITHUB_USERNAME`.
+
+```sh
+> cd $WORK_DIRECTORY
+> git clone git@github.com:${GITHUB_USERNAME}/beam
+> cd beam
+```
+
+Build IO expansion service jar.
+
+```sh
+> ./gradlew :sdks:java:io:expansion-service:build
+```
+
+Push a java SDK Harness container to Docker Hub. See 

Review comment:
       Can we link to the [Docker installation page](https://www.docker.com/get-started) here? Users might not have docker installed.

##########
File path: sdks/python/apache_beam/examples/kafkataxi/README.md
##########
@@ -0,0 +1,164 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+
+# Python KafkaIO Example
+
+This example reads from the PubSub NYC Taxi stream described
+[here](https://github.com/googlecodelabs/cloud-dataflow-nyc-taxi-tycoon), writes
+to a given Kafka topic and reads back from the same Kafka topic. This example
+uses cross-language transforms available in
+[kafka.py](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/kafka.py).
+Transforms are implemented in Java and are available
+[here](https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java).
+
+## Prerequisites
+
+Install Java in your system and make sure that `java` command is available in 
+the environment.
+
+```sh
+>java --version
+> <Should print information regarding the installed Java version>
+```
+
+## Setup the Kafka cluster
+
+This example requires users to setup a Kafka cluster that the Beam runner
+executing the pipeline has access to. There are few options.
+
+* For local runners that execute the pipelines in a single computer (for 
+example, portable DirectRunner or Spark/Flink runners in local mode), you can 
+setup a local Kafka cluster running in the same computer.
+* For Dataflow or portable Spark/Flink in distributed mode, you can setup a Kafka 
+cluster in GCE. See 
+[here](https://github.com/GoogleCloudPlatform/java-docs-samples/tree/master/dataflow/flex-templates/kafka_to_bigquery) 
+for step by step instructions for this.
+
+Let's assume that that IP address of the node running the Kafka cluster to be 
+`KAFKA_ADDRESS` and the port to be `9092`.
+
+```sh
+> export BOOTSTRAP_SERVER=KAFKA_ADDRESS:9092
+```
+
+## Running the example on latest released Beam version
+
+Perform Beam runner specific setup. Note that cross-language transforms require 
+portable implementations of Spark/Flink/Direct runners. Dataflow requires
+(runner V2)[https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#dataflow-runner-v2]
+and Beam 2.22.0 or later. See
+[here](https://beam.apache.org/documentation/runners/dataflow/) for 
+instructions for setting up Dataflow.
+
+Setup a virtual environment for running Beam Python programs. See 
+[here](https://beam.apache.org/get-started/quickstart-py/) for prerequisites. 
+Dataflow requires the `gcp` tag. Assuming your work directory to be
+`/path/to/work`.
+
+```sh
+> export WORK_DIRECTORY=/path/to/work
+> cd $WORK_DIRECTORY
+> mkdir kafka_env
+> virtualenv kafka_env
+> . kafka_env/bin/activate
+> pip install -e .[gcp]
+```
+
+Run the Beam pipeline. You can either use the default Kafka topic name or 
+specify a Kafka topic name. Following command assumes Dataflow. See 
+[here](https://beam.apache.org/get-started/quickstart-py/) for instructions on 
+running Beam Python programs on other runners. Note that you have to fill in 
+information mentioned within angle brackets (`<` and `>`).
+
+Note that this exemple is not available in Beam versions before 2.24.0 hence
+you'll have to either get the example program from Beam or follow steps
+provided in the section *Running the Example from a Beam Git Clone*.
+
+```sh
+>  python -m apache_beam.examples.kafkataxi.kafka_taxi --runner DataflowRunner --temp_location <GCS temp location> --project <GCP project>  --region <region> \
+   --num_workers 1 --job_name <job name> --bootstrap_servers $BOOTSTRAP_SERVER --experiments=use_runner_v2
+```
+
+## Running the Example from a Beam Git Clone
+
+Running this example from a Beam Git clone requires some additional steps.
+
+Checkout a clone of the Beam Git repo. See 
+[here](https://beam.apache.org/contribute/) for prerequisites.
+
+Assume your Github username to be `GITHUB_USERNAME`.
+
+```sh
+> cd $WORK_DIRECTORY
+> git clone git@github.com:${GITHUB_USERNAME}/beam
+> cd beam
+```
+
+Build IO expansion service jar.
+
+```sh
+> ./gradlew :sdks:java:io:expansion-service:build
+```
+
+Push a java SDK Harness container to Docker Hub. See 
+[here](https://beam.apache.org/documentation/runtime/environments/) for 
+prerequisites and additional information. Note that you have to fill in 
+information mentioned within angle brackets (`<` and `>`). Assume your Docker
+repository root to be `<Docker repository root>`.
+
+```sh
+> export DOCKER_ROOT=<Docker repository root>
+> ./gradlew :sdks:java:container:docker -Pdocker-repository-root=$DOCKER_ROOT -Pdocker-tag=latest
+> docker push $DOCKER_ROOT/beam_java_sdk:latest
+```
+
+For portable Flink/Spark in local mode, instead of above command just build the
+Java SDK harness container locally using the default values for repository root
+and the docker tag.
+
+Activate your Python virtual environment.  This example uses `virtualenv`. See 
+[here](https://cwiki.apache.org/confluence/display/BEAM/Python+Tips) for
+instructions regarding setting up other types of Python virtual environments.
+
+```sh
+> cd $WORK_DIRECTORY
+> mkdir kafka_env
+> virtualenv kafka_env
+> . kafka_env/bin/activate
+```
+
+Install Beam and dependencies and build a Beam distribution.
+
+```sh
+> cd beam/sdks/python

Review comment:
       Can we get rid of the `>` from these as well?
   
   I would also surround the `.[gcp,test]` in single quotes for `zsh` users.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] chamikaramj commented on pull request #12188: [BEAM-10411] Adds an example that use Python cross-language Kafka transforms.

Posted by GitBox <gi...@apache.org>.
chamikaramj commented on pull request #12188:
URL: https://github.com/apache/beam/pull/12188#issuecomment-655159675


   cc: @davidwrede 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] davidcavazos commented on a change in pull request #12188: [BEAM-10411] Adds an example that use Python cross-language Kafka transforms.

Posted by GitBox <gi...@apache.org>.
davidcavazos commented on a change in pull request #12188:
URL: https://github.com/apache/beam/pull/12188#discussion_r451686296



##########
File path: sdks/python/apache_beam/examples/kafkataxi/kafka_taxi.py
##########
@@ -0,0 +1,87 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""An example that writes to and reads from Kafka.
+
+ This example reads from the PubSub NYC Taxi stream described in
+ https://github.com/googlecodelabs/cloud-dataflow-nyc-taxi-tycoon, writes to a
+ given Kafka topic and reads back from the same Kafka topic.
+ """
+
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import argparse
+import logging
+import typing
+
+import apache_beam as beam
+from apache_beam.io.kafka import ReadFromKafka
+from apache_beam.io.kafka import WriteToKafka
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import SetupOptions
+from apache_beam.options.pipeline_options import StandardOptions
+
+
+def run(argv=None):
+  """Main entry point; defines and runs the wordcount pipeline."""
+  parser = argparse.ArgumentParser()
+  parser.add_argument(
+      '--bootstrap_servers',
+      dest='bootstrap_servers',
+      required=True,
+      help='Bootstrap servers for the Kafka cluster. Should be accessible by '
+      'the runner')
+  parser.add_argument(
+      '--topic',
+      dest='topic',
+      default='kafka_taxirides_realtime',
+      help='Kafka topic to write to and read from')
+  known_args, pipeline_args = parser.parse_known_args(argv)
+
+  pipeline_options = PipelineOptions(pipeline_args)
+  pipeline_options.view_as(SetupOptions).save_main_session = True
+  pipeline_options.view_as(StandardOptions).streaming = True
+
+  pipeline = beam.Pipeline(options=pipeline_options)
+  bootstrap_servers = known_args.bootstrap_servers
+  _ = (
+      pipeline
+      | beam.io.ReadFromPubSub(
+          topic='projects/pubsub-public-data/topics/taxirides-realtime').
+      with_output_types(bytes)
+      | beam.Map(lambda x: (b'', x)).with_output_types(
+          typing.Tuple[bytes, bytes])
+      | beam.WindowInto(beam.window.FixedWindows(15))

Review comment:
       Can we make this into either a constant or another command line argument with a default value? It's also a good idea to mention the units, are these seconds or minutes?

##########
File path: sdks/python/apache_beam/examples/kafkataxi/kafka_taxi.py
##########
@@ -0,0 +1,87 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""An example that writes to and reads from Kafka.
+
+ This example reads from the PubSub NYC Taxi stream described in
+ https://github.com/googlecodelabs/cloud-dataflow-nyc-taxi-tycoon, writes to a
+ given Kafka topic and reads back from the same Kafka topic.
+ """
+
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import argparse
+import logging
+import typing
+
+import apache_beam as beam
+from apache_beam.io.kafka import ReadFromKafka
+from apache_beam.io.kafka import WriteToKafka
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import SetupOptions
+from apache_beam.options.pipeline_options import StandardOptions
+
+
+def run(argv=None):
+  """Main entry point; defines and runs the wordcount pipeline."""
+  parser = argparse.ArgumentParser()
+  parser.add_argument(
+      '--bootstrap_servers',
+      dest='bootstrap_servers',
+      required=True,
+      help='Bootstrap servers for the Kafka cluster. Should be accessible by '
+      'the runner')
+  parser.add_argument(
+      '--topic',
+      dest='topic',
+      default='kafka_taxirides_realtime',
+      help='Kafka topic to write to and read from')
+  known_args, pipeline_args = parser.parse_known_args(argv)
+
+  pipeline_options = PipelineOptions(pipeline_args)
+  pipeline_options.view_as(SetupOptions).save_main_session = True
+  pipeline_options.view_as(StandardOptions).streaming = True
+
+  pipeline = beam.Pipeline(options=pipeline_options)

Review comment:
       Why not use `with` statement?
   
   ```py
   with beam.Pipeline(options=pipeline_options) as pipeline:
     _ = (
         pipeline
         | beam.io.ReadFromPubsub(...
         ...
     )
   
     _ = (
         pipeline
         | ReadFromKafka(...
         ...
     )
   ```

##########
File path: sdks/python/apache_beam/examples/kafkataxi/kafka_taxi.py
##########
@@ -0,0 +1,87 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""An example that writes to and reads from Kafka.
+
+ This example reads from the PubSub NYC Taxi stream described in
+ https://github.com/googlecodelabs/cloud-dataflow-nyc-taxi-tycoon, writes to a
+ given Kafka topic and reads back from the same Kafka topic.
+ """
+
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import argparse
+import logging
+import typing
+
+import apache_beam as beam
+from apache_beam.io.kafka import ReadFromKafka
+from apache_beam.io.kafka import WriteToKafka
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import SetupOptions
+from apache_beam.options.pipeline_options import StandardOptions
+
+
+def run(argv=None):

Review comment:
       In general, for Python samples, we usually have the `run` function accept the actual values the function needs to run instead of `argv`, and we let `__main__` use `argparse` to parse the arguments.
   
   This allows users to more easily import the sample "as a library" and run the sample from within a script. It also makes it easier to test.
   
   It's also a good idea to show a commented example of how those arguments would look like. Ideally, if they copy-paste the contents of the `run` function and uncomment those examples, it should run, although some inputs might need tweaking like a project ID or something like that.
   
   ```py
   def run(bootstrap_servers, topic, pipeline_args):
     # bootstrap_servers = '...'
     # topic = 'kafka_taxirides_realtime'
     # pipeline_args = ['--project', 'my-project', ...]
     ...
   
   if __name__ == '__main__':
     import argparse
   
     parser = argparse.ArgumentParser()
     ...
     args, pipeline_args = parser.parse_known_args()
   
     run(args.bootstrap_servers, args.topic, pipeline_args)
   ```

##########
File path: sdks/python/apache_beam/examples/kafkataxi/kafka_taxi.py
##########
@@ -0,0 +1,87 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""An example that writes to and reads from Kafka.
+
+ This example reads from the PubSub NYC Taxi stream described in
+ https://github.com/googlecodelabs/cloud-dataflow-nyc-taxi-tycoon, writes to a
+ given Kafka topic and reads back from the same Kafka topic.
+ """
+
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import argparse
+import logging
+import typing
+
+import apache_beam as beam
+from apache_beam.io.kafka import ReadFromKafka
+from apache_beam.io.kafka import WriteToKafka
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import SetupOptions
+from apache_beam.options.pipeline_options import StandardOptions
+
+
+def run(argv=None):
+  """Main entry point; defines and runs the wordcount pipeline."""
+  parser = argparse.ArgumentParser()
+  parser.add_argument(
+      '--bootstrap_servers',
+      dest='bootstrap_servers',
+      required=True,
+      help='Bootstrap servers for the Kafka cluster. Should be accessible by '
+      'the runner')
+  parser.add_argument(
+      '--topic',
+      dest='topic',
+      default='kafka_taxirides_realtime',
+      help='Kafka topic to write to and read from')
+  known_args, pipeline_args = parser.parse_known_args(argv)
+
+  pipeline_options = PipelineOptions(pipeline_args)
+  pipeline_options.view_as(SetupOptions).save_main_session = True
+  pipeline_options.view_as(StandardOptions).streaming = True
+
+  pipeline = beam.Pipeline(options=pipeline_options)
+  bootstrap_servers = known_args.bootstrap_servers
+  _ = (
+      pipeline
+      | beam.io.ReadFromPubSub(
+          topic='projects/pubsub-public-data/topics/taxirides-realtime').
+      with_output_types(bytes)
+      | beam.Map(lambda x: (b'', x)).with_output_types(

Review comment:
       Why are we adding an empty bytes key? Is it required by another transform? A comment would be nice.

##########
File path: sdks/python/apache_beam/examples/kafkataxi/kafka_taxi.py
##########
@@ -0,0 +1,87 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""An example that writes to and reads from Kafka.
+
+ This example reads from the PubSub NYC Taxi stream described in
+ https://github.com/googlecodelabs/cloud-dataflow-nyc-taxi-tycoon, writes to a
+ given Kafka topic and reads back from the same Kafka topic.
+ """
+
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import argparse
+import logging
+import typing
+
+import apache_beam as beam
+from apache_beam.io.kafka import ReadFromKafka
+from apache_beam.io.kafka import WriteToKafka
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import SetupOptions
+from apache_beam.options.pipeline_options import StandardOptions
+
+
+def run(argv=None):
+  """Main entry point; defines and runs the wordcount pipeline."""
+  parser = argparse.ArgumentParser()
+  parser.add_argument(
+      '--bootstrap_servers',
+      dest='bootstrap_servers',
+      required=True,
+      help='Bootstrap servers for the Kafka cluster. Should be accessible by '
+      'the runner')
+  parser.add_argument(
+      '--topic',
+      dest='topic',
+      default='kafka_taxirides_realtime',
+      help='Kafka topic to write to and read from')
+  known_args, pipeline_args = parser.parse_known_args(argv)
+
+  pipeline_options = PipelineOptions(pipeline_args)

Review comment:
       Is there any reason not to use the shorter version?
   
   ```py
   pipeline_options = PipelineOptions(
       pipeline_args,
       save_main_session=True,
       streaming=True)
   ```

##########
File path: sdks/python/apache_beam/examples/kafkataxi/kafka_taxi.py
##########
@@ -0,0 +1,87 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""An example that writes to and reads from Kafka.
+
+ This example reads from the PubSub NYC Taxi stream described in
+ https://github.com/googlecodelabs/cloud-dataflow-nyc-taxi-tycoon, writes to a
+ given Kafka topic and reads back from the same Kafka topic.
+ """
+
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import argparse
+import logging
+import typing
+
+import apache_beam as beam
+from apache_beam.io.kafka import ReadFromKafka
+from apache_beam.io.kafka import WriteToKafka
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import SetupOptions
+from apache_beam.options.pipeline_options import StandardOptions
+
+
+def run(argv=None):
+  """Main entry point; defines and runs the wordcount pipeline."""
+  parser = argparse.ArgumentParser()
+  parser.add_argument(
+      '--bootstrap_servers',
+      dest='bootstrap_servers',
+      required=True,
+      help='Bootstrap servers for the Kafka cluster. Should be accessible by '
+      'the runner')
+  parser.add_argument(
+      '--topic',
+      dest='topic',
+      default='kafka_taxirides_realtime',
+      help='Kafka topic to write to and read from')
+  known_args, pipeline_args = parser.parse_known_args(argv)
+
+  pipeline_options = PipelineOptions(pipeline_args)
+  pipeline_options.view_as(SetupOptions).save_main_session = True
+  pipeline_options.view_as(StandardOptions).streaming = True
+
+  pipeline = beam.Pipeline(options=pipeline_options)
+  bootstrap_servers = known_args.bootstrap_servers
+  _ = (
+      pipeline
+      | beam.io.ReadFromPubSub(
+          topic='projects/pubsub-public-data/topics/taxirides-realtime').
+      with_output_types(bytes)
+      | beam.Map(lambda x: (b'', x)).with_output_types(
+          typing.Tuple[bytes, bytes])
+      | beam.WindowInto(beam.window.FixedWindows(15))
+      | WriteToKafka(
+          producer_config={'bootstrap.servers': bootstrap_servers},
+          topic=known_args.topic))
+
+  _ = (
+      pipeline
+      | ReadFromKafka(
+          consumer_config={'bootstrap.servers': bootstrap_servers},
+          topics=[known_args.topic])
+      | beam.FlatMap(lambda x: []))

Review comment:
       Why are we getting rid of all the elements?
   
   By running this, do we see the elements via the Kafka transform or should we print the elements?

##########
File path: sdks/python/apache_beam/examples/kafkataxi/kafka_taxi.py
##########
@@ -0,0 +1,87 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""An example that writes to and reads from Kafka.
+
+ This example reads from the PubSub NYC Taxi stream described in
+ https://github.com/googlecodelabs/cloud-dataflow-nyc-taxi-tycoon, writes to a
+ given Kafka topic and reads back from the same Kafka topic.
+ """
+
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import argparse
+import logging
+import typing
+
+import apache_beam as beam
+from apache_beam.io.kafka import ReadFromKafka
+from apache_beam.io.kafka import WriteToKafka
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import SetupOptions
+from apache_beam.options.pipeline_options import StandardOptions
+
+
+def run(argv=None):
+  """Main entry point; defines and runs the wordcount pipeline."""
+  parser = argparse.ArgumentParser()
+  parser.add_argument(
+      '--bootstrap_servers',

Review comment:
       I know this is how Kafka calls them, but reading this name is not very intuitive of what it means. Maybe rename to something like `--kafka_servers`? What do you think?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] chamikaramj commented on pull request #12188: [BEAM-10411] Adds an example that use Python cross-language Kafka transforms.

Posted by GitBox <gi...@apache.org>.
chamikaramj commented on pull request #12188:
URL: https://github.com/apache/beam/pull/12188#issuecomment-654607544


   R: @davidcavazos @ihji 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] chamikaramj commented on a change in pull request #12188: [BEAM-10411] Adds an example that use Python cross-language Kafka transforms.

Posted by GitBox <gi...@apache.org>.
chamikaramj commented on a change in pull request #12188:
URL: https://github.com/apache/beam/pull/12188#discussion_r452546221



##########
File path: sdks/python/apache_beam/examples/kafkataxi/README.md
##########
@@ -0,0 +1,164 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+
+# Python KafkaIO Example
+
+This example reads from the PubSub NYC Taxi stream described
+[here](https://github.com/googlecodelabs/cloud-dataflow-nyc-taxi-tycoon), writes
+to a given Kafka topic and reads back from the same Kafka topic. This example
+uses cross-language transforms available in
+[kafka.py](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/kafka.py).
+Transforms are implemented in Java and are available
+[here](https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java).
+
+## Prerequisites
+
+Install Java in your system and make sure that `java` command is available in 
+the environment.
+
+```sh
+>java --version
+> <Should print information regarding the installed Java version>
+```

Review comment:
       No (also Beam comes with a Gradle script when cloning).

##########
File path: sdks/python/apache_beam/examples/kafkataxi/README.md
##########
@@ -0,0 +1,164 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+
+# Python KafkaIO Example
+
+This example reads from the PubSub NYC Taxi stream described
+[here](https://github.com/googlecodelabs/cloud-dataflow-nyc-taxi-tycoon), writes
+to a given Kafka topic and reads back from the same Kafka topic. This example
+uses cross-language transforms available in
+[kafka.py](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/kafka.py).
+Transforms are implemented in Java and are available
+[here](https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java).
+
+## Prerequisites
+
+Install Java in your system and make sure that `java` command is available in 

Review comment:
       Added the same link as Beam quick-start and updated text.

##########
File path: sdks/python/apache_beam/examples/kafkataxi/README.md
##########
@@ -0,0 +1,164 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+
+# Python KafkaIO Example
+
+This example reads from the PubSub NYC Taxi stream described
+[here](https://github.com/googlecodelabs/cloud-dataflow-nyc-taxi-tycoon), writes
+to a given Kafka topic and reads back from the same Kafka topic. This example
+uses cross-language transforms available in
+[kafka.py](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/kafka.py).
+Transforms are implemented in Java and are available
+[here](https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java).
+
+## Prerequisites
+
+Install Java in your system and make sure that `java` command is available in 
+the environment.
+
+```sh
+>java --version

Review comment:
       Done.

##########
File path: sdks/python/apache_beam/examples/kafkataxi/kafka_taxi.py
##########
@@ -0,0 +1,87 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""An example that writes to and reads from Kafka.
+
+ This example reads from the PubSub NYC Taxi stream described in
+ https://github.com/googlecodelabs/cloud-dataflow-nyc-taxi-tycoon, writes to a
+ given Kafka topic and reads back from the same Kafka topic.
+ """
+
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import argparse
+import logging
+import typing
+
+import apache_beam as beam
+from apache_beam.io.kafka import ReadFromKafka
+from apache_beam.io.kafka import WriteToKafka
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import SetupOptions
+from apache_beam.options.pipeline_options import StandardOptions
+
+
+def run(argv=None):
+  """Main entry point; defines and runs the wordcount pipeline."""
+  parser = argparse.ArgumentParser()
+  parser.add_argument(
+      '--bootstrap_servers',
+      dest='bootstrap_servers',
+      required=True,
+      help='Bootstrap servers for the Kafka cluster. Should be accessible by '
+      'the runner')
+  parser.add_argument(
+      '--topic',
+      dest='topic',
+      default='kafka_taxirides_realtime',
+      help='Kafka topic to write to and read from')
+  known_args, pipeline_args = parser.parse_known_args(argv)
+
+  pipeline_options = PipelineOptions(pipeline_args)
+  pipeline_options.view_as(SetupOptions).save_main_session = True
+  pipeline_options.view_as(StandardOptions).streaming = True
+
+  pipeline = beam.Pipeline(options=pipeline_options)
+  bootstrap_servers = known_args.bootstrap_servers
+  _ = (
+      pipeline
+      | beam.io.ReadFromPubSub(
+          topic='projects/pubsub-public-data/topics/taxirides-realtime').
+      with_output_types(bytes)
+      | beam.Map(lambda x: (b'', x)).with_output_types(

Review comment:
       It's required by the Kafka write transform added a comment.

##########
File path: sdks/python/apache_beam/examples/kafkataxi/README.md
##########
@@ -0,0 +1,164 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+
+# Python KafkaIO Example
+
+This example reads from the PubSub NYC Taxi stream described
+[here](https://github.com/googlecodelabs/cloud-dataflow-nyc-taxi-tycoon), writes
+to a given Kafka topic and reads back from the same Kafka topic. This example
+uses cross-language transforms available in
+[kafka.py](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/kafka.py).
+Transforms are implemented in Java and are available
+[here](https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java).
+
+## Prerequisites
+
+Install Java in your system and make sure that `java` command is available in 
+the environment.
+
+```sh
+>java --version
+> <Should print information regarding the installed Java version>
+```
+
+## Setup the Kafka cluster
+
+This example requires users to setup a Kafka cluster that the Beam runner
+executing the pipeline has access to. There are few options.
+
+* For local runners that execute the pipelines in a single computer (for 
+example, portable DirectRunner or Spark/Flink runners in local mode), you can 
+setup a local Kafka cluster running in the same computer.
+* For Dataflow or portable Spark/Flink in distributed mode, you can setup a Kafka 
+cluster in GCE. See 
+[here](https://github.com/GoogleCloudPlatform/java-docs-samples/tree/master/dataflow/flex-templates/kafka_to_bigquery) 
+for step by step instructions for this.
+
+Let's assume that that IP address of the node running the Kafka cluster to be 
+`KAFKA_ADDRESS` and the port to be `9092`.
+
+```sh
+> export BOOTSTRAP_SERVER=KAFKA_ADDRESS:9092
+```
+
+## Running the example on latest released Beam version
+
+Perform Beam runner specific setup. Note that cross-language transforms require 
+portable implementations of Spark/Flink/Direct runners. Dataflow requires
+(runner V2)[https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#dataflow-runner-v2]
+and Beam 2.22.0 or later. See
+[here](https://beam.apache.org/documentation/runners/dataflow/) for 
+instructions for setting up Dataflow.
+
+Setup a virtual environment for running Beam Python programs. See 
+[here](https://beam.apache.org/get-started/quickstart-py/) for prerequisites. 
+Dataflow requires the `gcp` tag. Assuming your work directory to be
+`/path/to/work`.
+
+```sh
+> export WORK_DIRECTORY=/path/to/work
+> cd $WORK_DIRECTORY
+> mkdir kafka_env
+> virtualenv kafka_env

Review comment:
       Done. Though I noticed that we mention virtualenv both in Beam contribution guide and Python quickstart.

##########
File path: sdks/python/apache_beam/examples/kafkataxi/README.md
##########
@@ -0,0 +1,164 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+
+# Python KafkaIO Example
+
+This example reads from the PubSub NYC Taxi stream described
+[here](https://github.com/googlecodelabs/cloud-dataflow-nyc-taxi-tycoon), writes
+to a given Kafka topic and reads back from the same Kafka topic. This example
+uses cross-language transforms available in
+[kafka.py](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/kafka.py).
+Transforms are implemented in Java and are available
+[here](https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java).
+
+## Prerequisites
+
+Install Java in your system and make sure that `java` command is available in 
+the environment.
+
+```sh
+>java --version
+> <Should print information regarding the installed Java version>
+```
+
+## Setup the Kafka cluster
+
+This example requires users to setup a Kafka cluster that the Beam runner
+executing the pipeline has access to. There are few options.
+
+* For local runners that execute the pipelines in a single computer (for 
+example, portable DirectRunner or Spark/Flink runners in local mode), you can 
+setup a local Kafka cluster running in the same computer.
+* For Dataflow or portable Spark/Flink in distributed mode, you can setup a Kafka 
+cluster in GCE. See 
+[here](https://github.com/GoogleCloudPlatform/java-docs-samples/tree/master/dataflow/flex-templates/kafka_to_bigquery) 
+for step by step instructions for this.
+
+Let's assume that that IP address of the node running the Kafka cluster to be 
+`KAFKA_ADDRESS` and the port to be `9092`.
+
+```sh
+> export BOOTSTRAP_SERVER=KAFKA_ADDRESS:9092
+```
+
+## Running the example on latest released Beam version
+
+Perform Beam runner specific setup. Note that cross-language transforms require 
+portable implementations of Spark/Flink/Direct runners. Dataflow requires
+(runner V2)[https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#dataflow-runner-v2]
+and Beam 2.22.0 or later. See
+[here](https://beam.apache.org/documentation/runners/dataflow/) for 
+instructions for setting up Dataflow.
+
+Setup a virtual environment for running Beam Python programs. See 
+[here](https://beam.apache.org/get-started/quickstart-py/) for prerequisites. 
+Dataflow requires the `gcp` tag. Assuming your work directory to be
+`/path/to/work`.
+
+```sh
+> export WORK_DIRECTORY=/path/to/work

Review comment:
       Done.

##########
File path: sdks/python/apache_beam/examples/kafkataxi/kafka_taxi.py
##########
@@ -0,0 +1,87 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""An example that writes to and reads from Kafka.
+
+ This example reads from the PubSub NYC Taxi stream described in
+ https://github.com/googlecodelabs/cloud-dataflow-nyc-taxi-tycoon, writes to a
+ given Kafka topic and reads back from the same Kafka topic.
+ """
+
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import argparse
+import logging
+import typing
+
+import apache_beam as beam
+from apache_beam.io.kafka import ReadFromKafka
+from apache_beam.io.kafka import WriteToKafka
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import SetupOptions
+from apache_beam.options.pipeline_options import StandardOptions
+
+
+def run(argv=None):
+  """Main entry point; defines and runs the wordcount pipeline."""
+  parser = argparse.ArgumentParser()
+  parser.add_argument(
+      '--bootstrap_servers',
+      dest='bootstrap_servers',
+      required=True,
+      help='Bootstrap servers for the Kafka cluster. Should be accessible by '
+      'the runner')
+  parser.add_argument(
+      '--topic',
+      dest='topic',
+      default='kafka_taxirides_realtime',
+      help='Kafka topic to write to and read from')
+  known_args, pipeline_args = parser.parse_known_args(argv)
+
+  pipeline_options = PipelineOptions(pipeline_args)

Review comment:
       Are you sure this works ? I don't see such keyword arguments.

##########
File path: sdks/python/apache_beam/examples/kafkataxi/README.md
##########
@@ -0,0 +1,164 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+
+# Python KafkaIO Example
+
+This example reads from the PubSub NYC Taxi stream described
+[here](https://github.com/googlecodelabs/cloud-dataflow-nyc-taxi-tycoon), writes
+to a given Kafka topic and reads back from the same Kafka topic. This example
+uses cross-language transforms available in
+[kafka.py](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/kafka.py).
+Transforms are implemented in Java and are available
+[here](https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java).
+
+## Prerequisites
+
+Install Java in your system and make sure that `java` command is available in 
+the environment.
+
+```sh
+>java --version
+> <Should print information regarding the installed Java version>
+```
+
+## Setup the Kafka cluster
+
+This example requires users to setup a Kafka cluster that the Beam runner
+executing the pipeline has access to. There are few options.
+
+* For local runners that execute the pipelines in a single computer (for 
+example, portable DirectRunner or Spark/Flink runners in local mode), you can 
+setup a local Kafka cluster running in the same computer.
+* For Dataflow or portable Spark/Flink in distributed mode, you can setup a Kafka 
+cluster in GCE. See 
+[here](https://github.com/GoogleCloudPlatform/java-docs-samples/tree/master/dataflow/flex-templates/kafka_to_bigquery) 
+for step by step instructions for this.
+
+Let's assume that that IP address of the node running the Kafka cluster to be 
+`KAFKA_ADDRESS` and the port to be `9092`.
+
+```sh
+> export BOOTSTRAP_SERVER=KAFKA_ADDRESS:9092
+```
+
+## Running the example on latest released Beam version
+
+Perform Beam runner specific setup. Note that cross-language transforms require 
+portable implementations of Spark/Flink/Direct runners. Dataflow requires
+(runner V2)[https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#dataflow-runner-v2]
+and Beam 2.22.0 or later. See
+[here](https://beam.apache.org/documentation/runners/dataflow/) for 
+instructions for setting up Dataflow.
+
+Setup a virtual environment for running Beam Python programs. See 
+[here](https://beam.apache.org/get-started/quickstart-py/) for prerequisites. 
+Dataflow requires the `gcp` tag. Assuming your work directory to be
+`/path/to/work`.
+
+```sh
+> export WORK_DIRECTORY=/path/to/work
+> cd $WORK_DIRECTORY
+> mkdir kafka_env
+> virtualenv kafka_env
+> . kafka_env/bin/activate
+> pip install -e .[gcp]
+```
+
+Run the Beam pipeline. You can either use the default Kafka topic name or 
+specify a Kafka topic name. Following command assumes Dataflow. See 
+[here](https://beam.apache.org/get-started/quickstart-py/) for instructions on 
+running Beam Python programs on other runners. Note that you have to fill in 
+information mentioned within angle brackets (`<` and `>`).
+
+Note that this exemple is not available in Beam versions before 2.24.0 hence
+you'll have to either get the example program from Beam or follow steps
+provided in the section *Running the Example from a Beam Git Clone*.
+
+```sh
+>  python -m apache_beam.examples.kafkataxi.kafka_taxi --runner DataflowRunner --temp_location <GCS temp location> --project <GCP project>  --region <region> \
+   --num_workers 1 --job_name <job name> --bootstrap_servers $BOOTSTRAP_SERVER --experiments=use_runner_v2
+```
+
+## Running the Example from a Beam Git Clone
+
+Running this example from a Beam Git clone requires some additional steps.
+
+Checkout a clone of the Beam Git repo. See 
+[here](https://beam.apache.org/contribute/) for prerequisites.
+
+Assume your Github username to be `GITHUB_USERNAME`.
+
+```sh
+> cd $WORK_DIRECTORY
+> git clone git@github.com:${GITHUB_USERNAME}/beam
+> cd beam
+```
+
+Build IO expansion service jar.
+
+```sh
+> ./gradlew :sdks:java:io:expansion-service:build
+```
+
+Push a java SDK Harness container to Docker Hub. See 

Review comment:
       Done.

##########
File path: sdks/python/apache_beam/examples/kafkataxi/kafka_taxi.py
##########
@@ -0,0 +1,87 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""An example that writes to and reads from Kafka.
+
+ This example reads from the PubSub NYC Taxi stream described in
+ https://github.com/googlecodelabs/cloud-dataflow-nyc-taxi-tycoon, writes to a
+ given Kafka topic and reads back from the same Kafka topic.
+ """
+
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import argparse
+import logging
+import typing
+
+import apache_beam as beam
+from apache_beam.io.kafka import ReadFromKafka
+from apache_beam.io.kafka import WriteToKafka
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import SetupOptions
+from apache_beam.options.pipeline_options import StandardOptions
+
+
+def run(argv=None):
+  """Main entry point; defines and runs the wordcount pipeline."""
+  parser = argparse.ArgumentParser()
+  parser.add_argument(
+      '--bootstrap_servers',
+      dest='bootstrap_servers',
+      required=True,
+      help='Bootstrap servers for the Kafka cluster. Should be accessible by '
+      'the runner')
+  parser.add_argument(
+      '--topic',
+      dest='topic',
+      default='kafka_taxirides_realtime',
+      help='Kafka topic to write to and read from')
+  known_args, pipeline_args = parser.parse_known_args(argv)
+
+  pipeline_options = PipelineOptions(pipeline_args)
+  pipeline_options.view_as(SetupOptions).save_main_session = True
+  pipeline_options.view_as(StandardOptions).streaming = True
+
+  pipeline = beam.Pipeline(options=pipeline_options)
+  bootstrap_servers = known_args.bootstrap_servers
+  _ = (
+      pipeline
+      | beam.io.ReadFromPubSub(
+          topic='projects/pubsub-public-data/topics/taxirides-realtime').
+      with_output_types(bytes)
+      | beam.Map(lambda x: (b'', x)).with_output_types(
+          typing.Tuple[bytes, bytes])
+      | beam.WindowInto(beam.window.FixedWindows(15))

Review comment:
       Moved to a parameter. Don't think this will be useful as a command line argument. Added a comment with the unit.

##########
File path: sdks/python/apache_beam/examples/kafkataxi/README.md
##########
@@ -0,0 +1,164 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+
+# Python KafkaIO Example
+
+This example reads from the PubSub NYC Taxi stream described
+[here](https://github.com/googlecodelabs/cloud-dataflow-nyc-taxi-tycoon), writes
+to a given Kafka topic and reads back from the same Kafka topic. This example
+uses cross-language transforms available in
+[kafka.py](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/kafka.py).
+Transforms are implemented in Java and are available
+[here](https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java).
+
+## Prerequisites
+
+Install Java in your system and make sure that `java` command is available in 
+the environment.
+
+```sh
+>java --version
+> <Should print information regarding the installed Java version>
+```
+
+## Setup the Kafka cluster
+
+This example requires users to setup a Kafka cluster that the Beam runner
+executing the pipeline has access to. There are few options.
+
+* For local runners that execute the pipelines in a single computer (for 
+example, portable DirectRunner or Spark/Flink runners in local mode), you can 
+setup a local Kafka cluster running in the same computer.
+* For Dataflow or portable Spark/Flink in distributed mode, you can setup a Kafka 
+cluster in GCE. See 
+[here](https://github.com/GoogleCloudPlatform/java-docs-samples/tree/master/dataflow/flex-templates/kafka_to_bigquery) 
+for step by step instructions for this.
+
+Let's assume that that IP address of the node running the Kafka cluster to be 
+`KAFKA_ADDRESS` and the port to be `9092`.
+
+```sh
+> export BOOTSTRAP_SERVER=KAFKA_ADDRESS:9092
+```
+
+## Running the example on latest released Beam version
+
+Perform Beam runner specific setup. Note that cross-language transforms require 
+portable implementations of Spark/Flink/Direct runners. Dataflow requires
+(runner V2)[https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#dataflow-runner-v2]
+and Beam 2.22.0 or later. See
+[here](https://beam.apache.org/documentation/runners/dataflow/) for 
+instructions for setting up Dataflow.
+
+Setup a virtual environment for running Beam Python programs. See 
+[here](https://beam.apache.org/get-started/quickstart-py/) for prerequisites. 
+Dataflow requires the `gcp` tag. Assuming your work directory to be
+`/path/to/work`.
+
+```sh
+> export WORK_DIRECTORY=/path/to/work
+> cd $WORK_DIRECTORY
+> mkdir kafka_env
+> virtualenv kafka_env
+> . kafka_env/bin/activate
+> pip install -e .[gcp]
+```
+
+Run the Beam pipeline. You can either use the default Kafka topic name or 
+specify a Kafka topic name. Following command assumes Dataflow. See 
+[here](https://beam.apache.org/get-started/quickstart-py/) for instructions on 
+running Beam Python programs on other runners. Note that you have to fill in 
+information mentioned within angle brackets (`<` and `>`).
+
+Note that this exemple is not available in Beam versions before 2.24.0 hence
+you'll have to either get the example program from Beam or follow steps
+provided in the section *Running the Example from a Beam Git Clone*.
+
+```sh
+>  python -m apache_beam.examples.kafkataxi.kafka_taxi --runner DataflowRunner --temp_location <GCS temp location> --project <GCP project>  --region <region> \
+   --num_workers 1 --job_name <job name> --bootstrap_servers $BOOTSTRAP_SERVER --experiments=use_runner_v2
+```
+
+## Running the Example from a Beam Git Clone
+
+Running this example from a Beam Git clone requires some additional steps.
+
+Checkout a clone of the Beam Git repo. See 
+[here](https://beam.apache.org/contribute/) for prerequisites.
+
+Assume your Github username to be `GITHUB_USERNAME`.
+
+```sh
+> cd $WORK_DIRECTORY

Review comment:
       Done.

##########
File path: sdks/python/apache_beam/examples/kafkataxi/README.md
##########
@@ -0,0 +1,164 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+
+# Python KafkaIO Example
+
+This example reads from the PubSub NYC Taxi stream described
+[here](https://github.com/googlecodelabs/cloud-dataflow-nyc-taxi-tycoon), writes
+to a given Kafka topic and reads back from the same Kafka topic. This example
+uses cross-language transforms available in
+[kafka.py](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/kafka.py).
+Transforms are implemented in Java and are available
+[here](https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java).
+
+## Prerequisites
+
+Install Java in your system and make sure that `java` command is available in 
+the environment.
+
+```sh
+>java --version
+> <Should print information regarding the installed Java version>
+```
+
+## Setup the Kafka cluster
+
+This example requires users to setup a Kafka cluster that the Beam runner
+executing the pipeline has access to. There are few options.
+
+* For local runners that execute the pipelines in a single computer (for 
+example, portable DirectRunner or Spark/Flink runners in local mode), you can 
+setup a local Kafka cluster running in the same computer.
+* For Dataflow or portable Spark/Flink in distributed mode, you can setup a Kafka 
+cluster in GCE. See 
+[here](https://github.com/GoogleCloudPlatform/java-docs-samples/tree/master/dataflow/flex-templates/kafka_to_bigquery) 
+for step by step instructions for this.
+
+Let's assume that that IP address of the node running the Kafka cluster to be 
+`KAFKA_ADDRESS` and the port to be `9092`.
+
+```sh
+> export BOOTSTRAP_SERVER=KAFKA_ADDRESS:9092
+```
+
+## Running the example on latest released Beam version
+
+Perform Beam runner specific setup. Note that cross-language transforms require 
+portable implementations of Spark/Flink/Direct runners. Dataflow requires
+(runner V2)[https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#dataflow-runner-v2]
+and Beam 2.22.0 or later. See
+[here](https://beam.apache.org/documentation/runners/dataflow/) for 
+instructions for setting up Dataflow.
+
+Setup a virtual environment for running Beam Python programs. See 
+[here](https://beam.apache.org/get-started/quickstart-py/) for prerequisites. 
+Dataflow requires the `gcp` tag. Assuming your work directory to be
+`/path/to/work`.
+
+```sh
+> export WORK_DIRECTORY=/path/to/work
+> cd $WORK_DIRECTORY
+> mkdir kafka_env
+> virtualenv kafka_env
+> . kafka_env/bin/activate
+> pip install -e .[gcp]
+```
+
+Run the Beam pipeline. You can either use the default Kafka topic name or 
+specify a Kafka topic name. Following command assumes Dataflow. See 
+[here](https://beam.apache.org/get-started/quickstart-py/) for instructions on 
+running Beam Python programs on other runners. Note that you have to fill in 
+information mentioned within angle brackets (`<` and `>`).
+
+Note that this exemple is not available in Beam versions before 2.24.0 hence
+you'll have to either get the example program from Beam or follow steps
+provided in the section *Running the Example from a Beam Git Clone*.
+
+```sh
+>  python -m apache_beam.examples.kafkataxi.kafka_taxi --runner DataflowRunner --temp_location <GCS temp location> --project <GCP project>  --region <region> \

Review comment:
       Done.

##########
File path: sdks/python/apache_beam/examples/kafkataxi/README.md
##########
@@ -0,0 +1,164 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+
+# Python KafkaIO Example
+
+This example reads from the PubSub NYC Taxi stream described
+[here](https://github.com/googlecodelabs/cloud-dataflow-nyc-taxi-tycoon), writes
+to a given Kafka topic and reads back from the same Kafka topic. This example
+uses cross-language transforms available in
+[kafka.py](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/kafka.py).
+Transforms are implemented in Java and are available
+[here](https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java).
+
+## Prerequisites
+
+Install Java in your system and make sure that `java` command is available in 
+the environment.
+
+```sh
+>java --version
+> <Should print information regarding the installed Java version>
+```
+
+## Setup the Kafka cluster
+
+This example requires users to setup a Kafka cluster that the Beam runner
+executing the pipeline has access to. There are few options.
+
+* For local runners that execute the pipelines in a single computer (for 
+example, portable DirectRunner or Spark/Flink runners in local mode), you can 
+setup a local Kafka cluster running in the same computer.
+* For Dataflow or portable Spark/Flink in distributed mode, you can setup a Kafka 
+cluster in GCE. See 
+[here](https://github.com/GoogleCloudPlatform/java-docs-samples/tree/master/dataflow/flex-templates/kafka_to_bigquery) 
+for step by step instructions for this.
+
+Let's assume that that IP address of the node running the Kafka cluster to be 
+`KAFKA_ADDRESS` and the port to be `9092`.
+
+```sh
+> export BOOTSTRAP_SERVER=KAFKA_ADDRESS:9092

Review comment:
       Yes, this is geared towards distributed runners though users would be able to run the example with Flink/Spark in local mode as well. I think it's good to not mention local Kafka cluster so that it's not confusing and gives the impression that distributed runners will work with this. Updated.

##########
File path: sdks/python/apache_beam/examples/kafkataxi/README.md
##########
@@ -0,0 +1,164 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+
+# Python KafkaIO Example
+
+This example reads from the PubSub NYC Taxi stream described
+[here](https://github.com/googlecodelabs/cloud-dataflow-nyc-taxi-tycoon), writes
+to a given Kafka topic and reads back from the same Kafka topic. This example
+uses cross-language transforms available in
+[kafka.py](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/kafka.py).
+Transforms are implemented in Java and are available
+[here](https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java).
+
+## Prerequisites
+
+Install Java in your system and make sure that `java` command is available in 
+the environment.
+
+```sh
+>java --version
+> <Should print information regarding the installed Java version>
+```
+
+## Setup the Kafka cluster
+
+This example requires users to setup a Kafka cluster that the Beam runner
+executing the pipeline has access to. There are few options.
+
+* For local runners that execute the pipelines in a single computer (for 

Review comment:
       Done.

##########
File path: sdks/python/apache_beam/examples/kafkataxi/README.md
##########
@@ -0,0 +1,164 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+
+# Python KafkaIO Example
+
+This example reads from the PubSub NYC Taxi stream described
+[here](https://github.com/googlecodelabs/cloud-dataflow-nyc-taxi-tycoon), writes
+to a given Kafka topic and reads back from the same Kafka topic. This example
+uses cross-language transforms available in
+[kafka.py](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/kafka.py).
+Transforms are implemented in Java and are available
+[here](https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java).
+
+## Prerequisites
+
+Install Java in your system and make sure that `java` command is available in 
+the environment.
+
+```sh
+>java --version
+> <Should print information regarding the installed Java version>
+```
+
+## Setup the Kafka cluster
+
+This example requires users to setup a Kafka cluster that the Beam runner
+executing the pipeline has access to. There are few options.
+
+* For local runners that execute the pipelines in a single computer (for 
+example, portable DirectRunner or Spark/Flink runners in local mode), you can 
+setup a local Kafka cluster running in the same computer.
+* For Dataflow or portable Spark/Flink in distributed mode, you can setup a Kafka 
+cluster in GCE. See 
+[here](https://github.com/GoogleCloudPlatform/java-docs-samples/tree/master/dataflow/flex-templates/kafka_to_bigquery) 
+for step by step instructions for this.
+
+Let's assume that that IP address of the node running the Kafka cluster to be 
+`KAFKA_ADDRESS` and the port to be `9092`.
+
+```sh
+> export BOOTSTRAP_SERVER=KAFKA_ADDRESS:9092
+```
+
+## Running the example on latest released Beam version
+
+Perform Beam runner specific setup. Note that cross-language transforms require 
+portable implementations of Spark/Flink/Direct runners. Dataflow requires
+(runner V2)[https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#dataflow-runner-v2]

Review comment:
       Done.

##########
File path: sdks/python/apache_beam/examples/kafkataxi/README.md
##########
@@ -0,0 +1,164 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+
+# Python KafkaIO Example
+
+This example reads from the PubSub NYC Taxi stream described
+[here](https://github.com/googlecodelabs/cloud-dataflow-nyc-taxi-tycoon), writes
+to a given Kafka topic and reads back from the same Kafka topic. This example
+uses cross-language transforms available in
+[kafka.py](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/kafka.py).
+Transforms are implemented in Java and are available
+[here](https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java).
+
+## Prerequisites
+
+Install Java in your system and make sure that `java` command is available in 
+the environment.
+
+```sh
+>java --version
+> <Should print information regarding the installed Java version>
+```
+
+## Setup the Kafka cluster
+
+This example requires users to setup a Kafka cluster that the Beam runner
+executing the pipeline has access to. There are few options.
+
+* For local runners that execute the pipelines in a single computer (for 
+example, portable DirectRunner or Spark/Flink runners in local mode), you can 
+setup a local Kafka cluster running in the same computer.
+* For Dataflow or portable Spark/Flink in distributed mode, you can setup a Kafka 
+cluster in GCE. See 
+[here](https://github.com/GoogleCloudPlatform/java-docs-samples/tree/master/dataflow/flex-templates/kafka_to_bigquery) 
+for step by step instructions for this.
+
+Let's assume that that IP address of the node running the Kafka cluster to be 
+`KAFKA_ADDRESS` and the port to be `9092`.
+
+```sh
+> export BOOTSTRAP_SERVER=KAFKA_ADDRESS:9092
+```
+
+## Running the example on latest released Beam version
+
+Perform Beam runner specific setup. Note that cross-language transforms require 
+portable implementations of Spark/Flink/Direct runners. Dataflow requires
+(runner V2)[https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#dataflow-runner-v2]
+and Beam 2.22.0 or later. See
+[here](https://beam.apache.org/documentation/runners/dataflow/) for 
+instructions for setting up Dataflow.
+
+Setup a virtual environment for running Beam Python programs. See 
+[here](https://beam.apache.org/get-started/quickstart-py/) for prerequisites. 
+Dataflow requires the `gcp` tag. Assuming your work directory to be
+`/path/to/work`.
+
+```sh
+> export WORK_DIRECTORY=/path/to/work
+> cd $WORK_DIRECTORY
+> mkdir kafka_env
+> virtualenv kafka_env
+> . kafka_env/bin/activate
+> pip install -e .[gcp]
+```
+
+Run the Beam pipeline. You can either use the default Kafka topic name or 
+specify a Kafka topic name. Following command assumes Dataflow. See 
+[here](https://beam.apache.org/get-started/quickstart-py/) for instructions on 
+running Beam Python programs on other runners. Note that you have to fill in 
+information mentioned within angle brackets (`<` and `>`).
+
+Note that this exemple is not available in Beam versions before 2.24.0 hence
+you'll have to either get the example program from Beam or follow steps
+provided in the section *Running the Example from a Beam Git Clone*.
+
+```sh
+>  python -m apache_beam.examples.kafkataxi.kafka_taxi --runner DataflowRunner --temp_location <GCS temp location> --project <GCP project>  --region <region> \
+   --num_workers 1 --job_name <job name> --bootstrap_servers $BOOTSTRAP_SERVER --experiments=use_runner_v2
+```
+
+## Running the Example from a Beam Git Clone
+
+Running this example from a Beam Git clone requires some additional steps.
+
+Checkout a clone of the Beam Git repo. See 
+[here](https://beam.apache.org/contribute/) for prerequisites.
+
+Assume your Github username to be `GITHUB_USERNAME`.
+
+```sh
+> cd $WORK_DIRECTORY
+> git clone git@github.com:${GITHUB_USERNAME}/beam
+> cd beam
+```
+
+Build IO expansion service jar.
+
+```sh
+> ./gradlew :sdks:java:io:expansion-service:build
+```
+
+Push a java SDK Harness container to Docker Hub. See 
+[here](https://beam.apache.org/documentation/runtime/environments/) for 
+prerequisites and additional information. Note that you have to fill in 
+information mentioned within angle brackets (`<` and `>`). Assume your Docker
+repository root to be `<Docker repository root>`.
+
+```sh
+> export DOCKER_ROOT=<Docker repository root>

Review comment:
       Done.

##########
File path: sdks/python/apache_beam/examples/kafkataxi/kafka_taxi.py
##########
@@ -0,0 +1,87 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""An example that writes to and reads from Kafka.
+
+ This example reads from the PubSub NYC Taxi stream described in
+ https://github.com/googlecodelabs/cloud-dataflow-nyc-taxi-tycoon, writes to a
+ given Kafka topic and reads back from the same Kafka topic.
+ """
+
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import argparse
+import logging
+import typing
+
+import apache_beam as beam
+from apache_beam.io.kafka import ReadFromKafka
+from apache_beam.io.kafka import WriteToKafka
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import SetupOptions
+from apache_beam.options.pipeline_options import StandardOptions
+
+
+def run(argv=None):
+  """Main entry point; defines and runs the wordcount pipeline."""
+  parser = argparse.ArgumentParser()
+  parser.add_argument(
+      '--bootstrap_servers',
+      dest='bootstrap_servers',
+      required=True,
+      help='Bootstrap servers for the Kafka cluster. Should be accessible by '
+      'the runner')
+  parser.add_argument(
+      '--topic',
+      dest='topic',
+      default='kafka_taxirides_realtime',
+      help='Kafka topic to write to and read from')
+  known_args, pipeline_args = parser.parse_known_args(argv)
+
+  pipeline_options = PipelineOptions(pipeline_args)
+  pipeline_options.view_as(SetupOptions).save_main_session = True
+  pipeline_options.view_as(StandardOptions).streaming = True
+
+  pipeline = beam.Pipeline(options=pipeline_options)

Review comment:
       Done.

##########
File path: sdks/python/apache_beam/examples/kafkataxi/README.md
##########
@@ -0,0 +1,164 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+
+# Python KafkaIO Example
+
+This example reads from the PubSub NYC Taxi stream described
+[here](https://github.com/googlecodelabs/cloud-dataflow-nyc-taxi-tycoon), writes
+to a given Kafka topic and reads back from the same Kafka topic. This example
+uses cross-language transforms available in
+[kafka.py](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/kafka.py).
+Transforms are implemented in Java and are available
+[here](https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java).
+
+## Prerequisites
+
+Install Java in your system and make sure that `java` command is available in 
+the environment.
+
+```sh
+>java --version
+> <Should print information regarding the installed Java version>
+```
+
+## Setup the Kafka cluster
+
+This example requires users to setup a Kafka cluster that the Beam runner
+executing the pipeline has access to. There are few options.
+
+* For local runners that execute the pipelines in a single computer (for 
+example, portable DirectRunner or Spark/Flink runners in local mode), you can 
+setup a local Kafka cluster running in the same computer.
+* For Dataflow or portable Spark/Flink in distributed mode, you can setup a Kafka 
+cluster in GCE. See 
+[here](https://github.com/GoogleCloudPlatform/java-docs-samples/tree/master/dataflow/flex-templates/kafka_to_bigquery) 
+for step by step instructions for this.
+
+Let's assume that that IP address of the node running the Kafka cluster to be 
+`KAFKA_ADDRESS` and the port to be `9092`.
+
+```sh
+> export BOOTSTRAP_SERVER=KAFKA_ADDRESS:9092
+```
+
+## Running the example on latest released Beam version
+
+Perform Beam runner specific setup. Note that cross-language transforms require 
+portable implementations of Spark/Flink/Direct runners. Dataflow requires
+(runner V2)[https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#dataflow-runner-v2]
+and Beam 2.22.0 or later. See
+[here](https://beam.apache.org/documentation/runners/dataflow/) for 
+instructions for setting up Dataflow.
+
+Setup a virtual environment for running Beam Python programs. See 
+[here](https://beam.apache.org/get-started/quickstart-py/) for prerequisites. 
+Dataflow requires the `gcp` tag. Assuming your work directory to be
+`/path/to/work`.
+
+```sh
+> export WORK_DIRECTORY=/path/to/work
+> cd $WORK_DIRECTORY
+> mkdir kafka_env
+> virtualenv kafka_env
+> . kafka_env/bin/activate
+> pip install -e .[gcp]
+```
+
+Run the Beam pipeline. You can either use the default Kafka topic name or 
+specify a Kafka topic name. Following command assumes Dataflow. See 
+[here](https://beam.apache.org/get-started/quickstart-py/) for instructions on 
+running Beam Python programs on other runners. Note that you have to fill in 
+information mentioned within angle brackets (`<` and `>`).
+
+Note that this exemple is not available in Beam versions before 2.24.0 hence
+you'll have to either get the example program from Beam or follow steps
+provided in the section *Running the Example from a Beam Git Clone*.
+
+```sh
+>  python -m apache_beam.examples.kafkataxi.kafka_taxi --runner DataflowRunner --temp_location <GCS temp location> --project <GCP project>  --region <region> \
+   --num_workers 1 --job_name <job name> --bootstrap_servers $BOOTSTRAP_SERVER --experiments=use_runner_v2
+```
+
+## Running the Example from a Beam Git Clone

Review comment:
       Done.

##########
File path: sdks/python/apache_beam/examples/kafkataxi/kafka_taxi.py
##########
@@ -0,0 +1,87 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""An example that writes to and reads from Kafka.
+
+ This example reads from the PubSub NYC Taxi stream described in
+ https://github.com/googlecodelabs/cloud-dataflow-nyc-taxi-tycoon, writes to a
+ given Kafka topic and reads back from the same Kafka topic.
+ """
+
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import argparse
+import logging
+import typing
+
+import apache_beam as beam
+from apache_beam.io.kafka import ReadFromKafka
+from apache_beam.io.kafka import WriteToKafka
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import SetupOptions
+from apache_beam.options.pipeline_options import StandardOptions
+
+
+def run(argv=None):

Review comment:
       Done.

##########
File path: sdks/python/apache_beam/examples/kafkataxi/README.md
##########
@@ -0,0 +1,164 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+
+# Python KafkaIO Example
+
+This example reads from the PubSub NYC Taxi stream described
+[here](https://github.com/googlecodelabs/cloud-dataflow-nyc-taxi-tycoon), writes
+to a given Kafka topic and reads back from the same Kafka topic. This example
+uses cross-language transforms available in
+[kafka.py](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/kafka.py).
+Transforms are implemented in Java and are available
+[here](https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java).
+
+## Prerequisites
+
+Install Java in your system and make sure that `java` command is available in 
+the environment.
+
+```sh
+>java --version
+> <Should print information regarding the installed Java version>
+```
+
+## Setup the Kafka cluster
+
+This example requires users to setup a Kafka cluster that the Beam runner
+executing the pipeline has access to. There are few options.
+
+* For local runners that execute the pipelines in a single computer (for 
+example, portable DirectRunner or Spark/Flink runners in local mode), you can 
+setup a local Kafka cluster running in the same computer.
+* For Dataflow or portable Spark/Flink in distributed mode, you can setup a Kafka 
+cluster in GCE. See 
+[here](https://github.com/GoogleCloudPlatform/java-docs-samples/tree/master/dataflow/flex-templates/kafka_to_bigquery) 
+for step by step instructions for this.
+
+Let's assume that that IP address of the node running the Kafka cluster to be 
+`KAFKA_ADDRESS` and the port to be `9092`.
+
+```sh
+> export BOOTSTRAP_SERVER=KAFKA_ADDRESS:9092
+```
+
+## Running the example on latest released Beam version
+
+Perform Beam runner specific setup. Note that cross-language transforms require 
+portable implementations of Spark/Flink/Direct runners. Dataflow requires
+(runner V2)[https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#dataflow-runner-v2]
+and Beam 2.22.0 or later. See
+[here](https://beam.apache.org/documentation/runners/dataflow/) for 
+instructions for setting up Dataflow.
+
+Setup a virtual environment for running Beam Python programs. See 
+[here](https://beam.apache.org/get-started/quickstart-py/) for prerequisites. 
+Dataflow requires the `gcp` tag. Assuming your work directory to be
+`/path/to/work`.
+
+```sh
+> export WORK_DIRECTORY=/path/to/work
+> cd $WORK_DIRECTORY
+> mkdir kafka_env
+> virtualenv kafka_env
+> . kafka_env/bin/activate
+> pip install -e .[gcp]
+```
+
+Run the Beam pipeline. You can either use the default Kafka topic name or 
+specify a Kafka topic name. Following command assumes Dataflow. See 
+[here](https://beam.apache.org/get-started/quickstart-py/) for instructions on 
+running Beam Python programs on other runners. Note that you have to fill in 
+information mentioned within angle brackets (`<` and `>`).
+
+Note that this exemple is not available in Beam versions before 2.24.0 hence
+you'll have to either get the example program from Beam or follow steps
+provided in the section *Running the Example from a Beam Git Clone*.
+
+```sh
+>  python -m apache_beam.examples.kafkataxi.kafka_taxi --runner DataflowRunner --temp_location <GCS temp location> --project <GCP project>  --region <region> \
+   --num_workers 1 --job_name <job name> --bootstrap_servers $BOOTSTRAP_SERVER --experiments=use_runner_v2
+```
+
+## Running the Example from a Beam Git Clone
+
+Running this example from a Beam Git clone requires some additional steps.
+
+Checkout a clone of the Beam Git repo. See 
+[here](https://beam.apache.org/contribute/) for prerequisites.
+
+Assume your Github username to be `GITHUB_USERNAME`.
+
+```sh
+> cd $WORK_DIRECTORY
+> git clone git@github.com:${GITHUB_USERNAME}/beam
+> cd beam
+```
+
+Build IO expansion service jar.
+
+```sh
+> ./gradlew :sdks:java:io:expansion-service:build
+```
+
+Push a java SDK Harness container to Docker Hub. See 
+[here](https://beam.apache.org/documentation/runtime/environments/) for 
+prerequisites and additional information. Note that you have to fill in 
+information mentioned within angle brackets (`<` and `>`). Assume your Docker
+repository root to be `<Docker repository root>`.
+
+```sh
+> export DOCKER_ROOT=<Docker repository root>
+> ./gradlew :sdks:java:container:docker -Pdocker-repository-root=$DOCKER_ROOT -Pdocker-tag=latest
+> docker push $DOCKER_ROOT/beam_java_sdk:latest
+```
+
+For portable Flink/Spark in local mode, instead of above command just build the
+Java SDK harness container locally using the default values for repository root
+and the docker tag.
+
+Activate your Python virtual environment.  This example uses `virtualenv`. See 
+[here](https://cwiki.apache.org/confluence/display/BEAM/Python+Tips) for
+instructions regarding setting up other types of Python virtual environments.
+
+```sh
+> cd $WORK_DIRECTORY

Review comment:
       Done.

##########
File path: sdks/python/apache_beam/examples/kafkataxi/README.md
##########
@@ -0,0 +1,164 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+
+# Python KafkaIO Example
+
+This example reads from the PubSub NYC Taxi stream described
+[here](https://github.com/googlecodelabs/cloud-dataflow-nyc-taxi-tycoon), writes
+to a given Kafka topic and reads back from the same Kafka topic. This example
+uses cross-language transforms available in
+[kafka.py](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/kafka.py).
+Transforms are implemented in Java and are available
+[here](https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java).
+
+## Prerequisites
+
+Install Java in your system and make sure that `java` command is available in 
+the environment.
+
+```sh
+>java --version
+> <Should print information regarding the installed Java version>
+```
+
+## Setup the Kafka cluster
+
+This example requires users to setup a Kafka cluster that the Beam runner
+executing the pipeline has access to. There are few options.
+
+* For local runners that execute the pipelines in a single computer (for 
+example, portable DirectRunner or Spark/Flink runners in local mode), you can 
+setup a local Kafka cluster running in the same computer.
+* For Dataflow or portable Spark/Flink in distributed mode, you can setup a Kafka 
+cluster in GCE. See 
+[here](https://github.com/GoogleCloudPlatform/java-docs-samples/tree/master/dataflow/flex-templates/kafka_to_bigquery) 
+for step by step instructions for this.
+
+Let's assume that that IP address of the node running the Kafka cluster to be 
+`KAFKA_ADDRESS` and the port to be `9092`.
+
+```sh
+> export BOOTSTRAP_SERVER=KAFKA_ADDRESS:9092
+```
+
+## Running the example on latest released Beam version
+
+Perform Beam runner specific setup. Note that cross-language transforms require 
+portable implementations of Spark/Flink/Direct runners. Dataflow requires
+(runner V2)[https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#dataflow-runner-v2]
+and Beam 2.22.0 or later. See
+[here](https://beam.apache.org/documentation/runners/dataflow/) for 
+instructions for setting up Dataflow.
+
+Setup a virtual environment for running Beam Python programs. See 
+[here](https://beam.apache.org/get-started/quickstart-py/) for prerequisites. 
+Dataflow requires the `gcp` tag. Assuming your work directory to be
+`/path/to/work`.
+
+```sh
+> export WORK_DIRECTORY=/path/to/work
+> cd $WORK_DIRECTORY
+> mkdir kafka_env
+> virtualenv kafka_env
+> . kafka_env/bin/activate
+> pip install -e .[gcp]
+```
+
+Run the Beam pipeline. You can either use the default Kafka topic name or 
+specify a Kafka topic name. Following command assumes Dataflow. See 
+[here](https://beam.apache.org/get-started/quickstart-py/) for instructions on 
+running Beam Python programs on other runners. Note that you have to fill in 
+information mentioned within angle brackets (`<` and `>`).
+
+Note that this exemple is not available in Beam versions before 2.24.0 hence
+you'll have to either get the example program from Beam or follow steps
+provided in the section *Running the Example from a Beam Git Clone*.
+
+```sh
+>  python -m apache_beam.examples.kafkataxi.kafka_taxi --runner DataflowRunner --temp_location <GCS temp location> --project <GCP project>  --region <region> \
+   --num_workers 1 --job_name <job name> --bootstrap_servers $BOOTSTRAP_SERVER --experiments=use_runner_v2
+```
+
+## Running the Example from a Beam Git Clone
+
+Running this example from a Beam Git clone requires some additional steps.
+
+Checkout a clone of the Beam Git repo. See 
+[here](https://beam.apache.org/contribute/) for prerequisites.
+
+Assume your Github username to be `GITHUB_USERNAME`.
+
+```sh
+> cd $WORK_DIRECTORY
+> git clone git@github.com:${GITHUB_USERNAME}/beam
+> cd beam
+```
+
+Build IO expansion service jar.
+
+```sh
+> ./gradlew :sdks:java:io:expansion-service:build
+```
+
+Push a java SDK Harness container to Docker Hub. See 
+[here](https://beam.apache.org/documentation/runtime/environments/) for 
+prerequisites and additional information. Note that you have to fill in 
+information mentioned within angle brackets (`<` and `>`). Assume your Docker
+repository root to be `<Docker repository root>`.
+
+```sh
+> export DOCKER_ROOT=<Docker repository root>
+> ./gradlew :sdks:java:container:docker -Pdocker-repository-root=$DOCKER_ROOT -Pdocker-tag=latest
+> docker push $DOCKER_ROOT/beam_java_sdk:latest
+```
+
+For portable Flink/Spark in local mode, instead of above command just build the
+Java SDK harness container locally using the default values for repository root
+and the docker tag.
+
+Activate your Python virtual environment.  This example uses `virtualenv`. See 
+[here](https://cwiki.apache.org/confluence/display/BEAM/Python+Tips) for
+instructions regarding setting up other types of Python virtual environments.
+
+```sh
+> cd $WORK_DIRECTORY
+> mkdir kafka_env
+> virtualenv kafka_env
+> . kafka_env/bin/activate
+```
+
+Install Beam and dependencies and build a Beam distribution.
+
+```sh
+> cd beam/sdks/python

Review comment:
       Done.

##########
File path: sdks/python/apache_beam/examples/kafkataxi/kafka_taxi.py
##########
@@ -0,0 +1,87 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""An example that writes to and reads from Kafka.
+
+ This example reads from the PubSub NYC Taxi stream described in
+ https://github.com/googlecodelabs/cloud-dataflow-nyc-taxi-tycoon, writes to a
+ given Kafka topic and reads back from the same Kafka topic.
+ """
+
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import argparse
+import logging
+import typing
+
+import apache_beam as beam
+from apache_beam.io.kafka import ReadFromKafka
+from apache_beam.io.kafka import WriteToKafka
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import SetupOptions
+from apache_beam.options.pipeline_options import StandardOptions
+
+
+def run(argv=None):
+  """Main entry point; defines and runs the wordcount pipeline."""
+  parser = argparse.ArgumentParser()
+  parser.add_argument(
+      '--bootstrap_servers',

Review comment:
       I think it's better to keep this as 'bootstrap_servers' since that's well known for any Kafka users.

##########
File path: sdks/python/apache_beam/examples/kafkataxi/README.md
##########
@@ -0,0 +1,164 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+
+# Python KafkaIO Example
+
+This example reads from the PubSub NYC Taxi stream described
+[here](https://github.com/googlecodelabs/cloud-dataflow-nyc-taxi-tycoon), writes
+to a given Kafka topic and reads back from the same Kafka topic. This example
+uses cross-language transforms available in
+[kafka.py](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/kafka.py).
+Transforms are implemented in Java and are available
+[here](https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java).
+
+## Prerequisites
+
+Install Java in your system and make sure that `java` command is available in 
+the environment.
+
+```sh
+>java --version
+> <Should print information regarding the installed Java version>
+```
+
+## Setup the Kafka cluster
+
+This example requires users to setup a Kafka cluster that the Beam runner
+executing the pipeline has access to. There are few options.
+
+* For local runners that execute the pipelines in a single computer (for 
+example, portable DirectRunner or Spark/Flink runners in local mode), you can 
+setup a local Kafka cluster running in the same computer.
+* For Dataflow or portable Spark/Flink in distributed mode, you can setup a Kafka 
+cluster in GCE. See 
+[here](https://github.com/GoogleCloudPlatform/java-docs-samples/tree/master/dataflow/flex-templates/kafka_to_bigquery) 
+for step by step instructions for this.
+
+Let's assume that that IP address of the node running the Kafka cluster to be 
+`KAFKA_ADDRESS` and the port to be `9092`.
+
+```sh
+> export BOOTSTRAP_SERVER=KAFKA_ADDRESS:9092
+```
+
+## Running the example on latest released Beam version
+
+Perform Beam runner specific setup. Note that cross-language transforms require 
+portable implementations of Spark/Flink/Direct runners. Dataflow requires
+(runner V2)[https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#dataflow-runner-v2]
+and Beam 2.22.0 or later. See
+[here](https://beam.apache.org/documentation/runners/dataflow/) for 
+instructions for setting up Dataflow.
+
+Setup a virtual environment for running Beam Python programs. See 
+[here](https://beam.apache.org/get-started/quickstart-py/) for prerequisites. 
+Dataflow requires the `gcp` tag. Assuming your work directory to be
+`/path/to/work`.
+
+```sh
+> export WORK_DIRECTORY=/path/to/work
+> cd $WORK_DIRECTORY
+> mkdir kafka_env
+> virtualenv kafka_env
+> . kafka_env/bin/activate
+> pip install -e .[gcp]
+```
+
+Run the Beam pipeline. You can either use the default Kafka topic name or 
+specify a Kafka topic name. Following command assumes Dataflow. See 
+[here](https://beam.apache.org/get-started/quickstart-py/) for instructions on 
+running Beam Python programs on other runners. Note that you have to fill in 
+information mentioned within angle brackets (`<` and `>`).
+
+Note that this exemple is not available in Beam versions before 2.24.0 hence
+you'll have to either get the example program from Beam or follow steps
+provided in the section *Running the Example from a Beam Git Clone*.
+
+```sh
+>  python -m apache_beam.examples.kafkataxi.kafka_taxi --runner DataflowRunner --temp_location <GCS temp location> --project <GCP project>  --region <region> \
+   --num_workers 1 --job_name <job name> --bootstrap_servers $BOOTSTRAP_SERVER --experiments=use_runner_v2
+```
+
+## Running the Example from a Beam Git Clone
+
+Running this example from a Beam Git clone requires some additional steps.
+
+Checkout a clone of the Beam Git repo. See 
+[here](https://beam.apache.org/contribute/) for prerequisites.
+
+Assume your Github username to be `GITHUB_USERNAME`.
+
+```sh
+> cd $WORK_DIRECTORY
+> git clone git@github.com:${GITHUB_USERNAME}/beam
+> cd beam
+```
+
+Build IO expansion service jar.
+
+```sh
+> ./gradlew :sdks:java:io:expansion-service:build

Review comment:
       Done.

##########
File path: sdks/python/apache_beam/examples/kafkataxi/kafka_taxi.py
##########
@@ -0,0 +1,87 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""An example that writes to and reads from Kafka.
+
+ This example reads from the PubSub NYC Taxi stream described in
+ https://github.com/googlecodelabs/cloud-dataflow-nyc-taxi-tycoon, writes to a
+ given Kafka topic and reads back from the same Kafka topic.
+ """
+
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import argparse
+import logging
+import typing
+
+import apache_beam as beam
+from apache_beam.io.kafka import ReadFromKafka
+from apache_beam.io.kafka import WriteToKafka
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import SetupOptions
+from apache_beam.options.pipeline_options import StandardOptions
+
+
+def run(argv=None):
+  """Main entry point; defines and runs the wordcount pipeline."""
+  parser = argparse.ArgumentParser()
+  parser.add_argument(
+      '--bootstrap_servers',
+      dest='bootstrap_servers',
+      required=True,
+      help='Bootstrap servers for the Kafka cluster. Should be accessible by '
+      'the runner')
+  parser.add_argument(
+      '--topic',
+      dest='topic',
+      default='kafka_taxirides_realtime',
+      help='Kafka topic to write to and read from')
+  known_args, pipeline_args = parser.parse_known_args(argv)
+
+  pipeline_options = PipelineOptions(pipeline_args)
+  pipeline_options.view_as(SetupOptions).save_main_session = True
+  pipeline_options.view_as(StandardOptions).streaming = True
+
+  pipeline = beam.Pipeline(options=pipeline_options)
+  bootstrap_servers = known_args.bootstrap_servers
+  _ = (
+      pipeline
+      | beam.io.ReadFromPubSub(
+          topic='projects/pubsub-public-data/topics/taxirides-realtime').
+      with_output_types(bytes)
+      | beam.Map(lambda x: (b'', x)).with_output_types(
+          typing.Tuple[bytes, bytes])
+      | beam.WindowInto(beam.window.FixedWindows(15))
+      | WriteToKafka(
+          producer_config={'bootstrap.servers': bootstrap_servers},
+          topic=known_args.topic))
+
+  _ = (
+      pipeline
+      | ReadFromKafka(
+          consumer_config={'bootstrap.servers': bootstrap_servers},
+          topics=[known_args.topic])
+      | beam.FlatMap(lambda x: []))

Review comment:
       Updated to parse and print elements.

##########
File path: sdks/python/apache_beam/examples/kafkataxi/README.md
##########
@@ -0,0 +1,164 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+
+# Python KafkaIO Example
+
+This example reads from the PubSub NYC Taxi stream described
+[here](https://github.com/googlecodelabs/cloud-dataflow-nyc-taxi-tycoon), writes
+to a given Kafka topic and reads back from the same Kafka topic. This example
+uses cross-language transforms available in
+[kafka.py](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/kafka.py).
+Transforms are implemented in Java and are available
+[here](https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java).
+
+## Prerequisites
+
+Install Java in your system and make sure that `java` command is available in 
+the environment.
+
+```sh
+>java --version
+> <Should print information regarding the installed Java version>
+```
+
+## Setup the Kafka cluster
+
+This example requires users to setup a Kafka cluster that the Beam runner
+executing the pipeline has access to. There are few options.
+
+* For local runners that execute the pipelines in a single computer (for 
+example, portable DirectRunner or Spark/Flink runners in local mode), you can 
+setup a local Kafka cluster running in the same computer.
+* For Dataflow or portable Spark/Flink in distributed mode, you can setup a Kafka 
+cluster in GCE. See 
+[here](https://github.com/GoogleCloudPlatform/java-docs-samples/tree/master/dataflow/flex-templates/kafka_to_bigquery) 
+for step by step instructions for this.
+
+Let's assume that that IP address of the node running the Kafka cluster to be 
+`KAFKA_ADDRESS` and the port to be `9092`.
+
+```sh
+> export BOOTSTRAP_SERVER=KAFKA_ADDRESS:9092
+```
+
+## Running the example on latest released Beam version
+
+Perform Beam runner specific setup. Note that cross-language transforms require 
+portable implementations of Spark/Flink/Direct runners. Dataflow requires
+(runner V2)[https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#dataflow-runner-v2]
+and Beam 2.22.0 or later. See
+[here](https://beam.apache.org/documentation/runners/dataflow/) for 
+instructions for setting up Dataflow.
+
+Setup a virtual environment for running Beam Python programs. See 
+[here](https://beam.apache.org/get-started/quickstart-py/) for prerequisites. 
+Dataflow requires the `gcp` tag. Assuming your work directory to be
+`/path/to/work`.
+
+```sh
+> export WORK_DIRECTORY=/path/to/work
+> cd $WORK_DIRECTORY
+> mkdir kafka_env
+> virtualenv kafka_env
+> . kafka_env/bin/activate
+> pip install -e .[gcp]
+```
+
+Run the Beam pipeline. You can either use the default Kafka topic name or 
+specify a Kafka topic name. Following command assumes Dataflow. See 
+[here](https://beam.apache.org/get-started/quickstart-py/) for instructions on 
+running Beam Python programs on other runners. Note that you have to fill in 
+information mentioned within angle brackets (`<` and `>`).
+
+Note that this exemple is not available in Beam versions before 2.24.0 hence
+you'll have to either get the example program from Beam or follow steps
+provided in the section *Running the Example from a Beam Git Clone*.
+
+```sh
+>  python -m apache_beam.examples.kafkataxi.kafka_taxi --runner DataflowRunner --temp_location <GCS temp location> --project <GCP project>  --region <region> \
+   --num_workers 1 --job_name <job name> --bootstrap_servers $BOOTSTRAP_SERVER --experiments=use_runner_v2
+```
+
+## Running the Example from a Beam Git Clone
+
+Running this example from a Beam Git clone requires some additional steps.
+
+Checkout a clone of the Beam Git repo. See 
+[here](https://beam.apache.org/contribute/) for prerequisites.
+
+Assume your Github username to be `GITHUB_USERNAME`.
+
+```sh
+> cd $WORK_DIRECTORY
+> git clone git@github.com:${GITHUB_USERNAME}/beam
+> cd beam
+```
+
+Build IO expansion service jar.
+
+```sh
+> ./gradlew :sdks:java:io:expansion-service:build
+```
+
+Push a java SDK Harness container to Docker Hub. See 
+[here](https://beam.apache.org/documentation/runtime/environments/) for 
+prerequisites and additional information. Note that you have to fill in 
+information mentioned within angle brackets (`<` and `>`). Assume your Docker
+repository root to be `<Docker repository root>`.
+
+```sh
+> export DOCKER_ROOT=<Docker repository root>
+> ./gradlew :sdks:java:container:docker -Pdocker-repository-root=$DOCKER_ROOT -Pdocker-tag=latest
+> docker push $DOCKER_ROOT/beam_java_sdk:latest
+```
+
+For portable Flink/Spark in local mode, instead of above command just build the
+Java SDK harness container locally using the default values for repository root
+and the docker tag.
+
+Activate your Python virtual environment.  This example uses `virtualenv`. See 
+[here](https://cwiki.apache.org/confluence/display/BEAM/Python+Tips) for
+instructions regarding setting up other types of Python virtual environments.
+
+```sh
+> cd $WORK_DIRECTORY
+> mkdir kafka_env
+> virtualenv kafka_env
+> . kafka_env/bin/activate
+```
+
+Install Beam and dependencies and build a Beam distribution.
+
+```sh
+> cd beam/sdks/python
+> pip install -r build-requirements.txt
+> pip install -e .[gcp,test]
+> python setup.py sdist
+```
+
+Run the Beam pipeline. You can either use the default Kafka topic name or specify 
+a Kafka topic name. Following command assumes Dataflow. See 
+[here](https://beam.apache.org/get-started/quickstart-py/) for instructions on 
+running Beam Python programs on other runners.
+
+```sh
+>  python -m apache_beam.examples.kafkataxi.kafka_taxi --runner DataflowRunner --temp_location <GCS temp location> --project <GCP project>  --region <region> \

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] chamikaramj commented on a change in pull request #12188: [BEAM-10411] Adds an example that use Python cross-language Kafka transforms.

Posted by GitBox <gi...@apache.org>.
chamikaramj commented on a change in pull request #12188:
URL: https://github.com/apache/beam/pull/12188#discussion_r458298289



##########
File path: sdks/python/apache_beam/examples/kafkataxi/kafka_taxi.py
##########
@@ -0,0 +1,105 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""An example that writes to and reads from Kafka.
+
+ This example reads from the PubSub NYC Taxi stream described in
+ https://github.com/googlecodelabs/cloud-dataflow-nyc-taxi-tycoon, writes to a
+ given Kafka topic and reads back from the same Kafka topic.
+ """
+
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import logging
+import typing
+
+import apache_beam as beam
+from apache_beam.io.kafka import ReadFromKafka
+from apache_beam.io.kafka import WriteToKafka
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import SetupOptions
+from apache_beam.options.pipeline_options import StandardOptions
+
+
+def run(bootstrap_servers, topic, pipeline_args):
+  # bootstrap_servers = '...'

Review comment:
       Done.

##########
File path: sdks/python/apache_beam/examples/kafkataxi/README.md
##########
@@ -0,0 +1,190 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+
+# Python KafkaIO Example
+
+This example reads from the Google Cloud Pub/Sub NYC Taxi stream described
+[here](https://github.com/googlecodelabs/cloud-dataflow-nyc-taxi-tycoon), writes
+to a given Kafka topic, and reads back from the same Kafka topic. This example
+uses cross-language transforms available in
+[kafka.py](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/kafka.py).
+Transforms are implemented in Java and are available
+[here](https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java).
+
+## Prerequisites
+
+Install [Java Development kit (JDK) version 8](https://www.oracle.com/java/technologies/javase-downloads.html)
+in your system and make sure that `JAVA_HOME` environment variable points to
+your JDK installation. Make sure that `java` command is available in 
+the environment.
+
+```sh
+java --version
+<Should print information regarding the installed Java version>
+```
+
+## Setup the Kafka cluster
+
+This example requires users to setup a Kafka cluster that the Beam runner
+executing the pipeline has access to.
+
+See [here]((https://kafka.apache.org/quickstart)) for general instructions on
+setting up a Kafka cluster. One option is to setup the Kafka cluster in
+[GCE](https://cloud.google.com/compute). See
+[here](https://github.com/GoogleCloudPlatform/java-docs-samples/tree/master/dataflow/flex-templates/kafka_to_bigquery)
+for step by step instructions on  setting up a single node Kafka cluster in GCE.
+When using Dataflow consider starting the Kafka cluster in the region where
+Dataflow pipeline will be running. See
+[here](https://cloud.google.com/dataflow/docs/concepts/regional-endpoints) 
+for more details regarding the selecting a GCP region for Dataflow.
+
+Let's assume that that IP address of one of the [bootstrap servers](https://kafka.apache.org/quickstart)
+of the Kafka cluster to be  `123.45.67.89:123` and the port to be `9092`.
+
+```sh
+export BOOTSTRAP_SERVER="123.45.67.89:123:9092"
+```
+
+## Running the example on latest released Beam version
+
+Perform Beam runner specific setup. Note that cross-language transforms require 
+portable implementations of Spark/Flink/Direct runners. Dataflow requires
+[runner V2](https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#dataflow-runner-v2).
+See [here](https://beam.apache.org/documentation/runners/dataflow/) for 
+instructions for setting up Dataflow.
+
+Setup a virtual environment for running Beam Python programs. See 
+[here](https://beam.apache.org/get-started/quickstart-py/) for prerequisites. 
+Dataflow requires the `gcp` tag when installing Beam.
+
+```sh
+python -m venv env
+source env/bin/activate
+pip install -e 'apache-beam[gcp]'
+```
+
+Run the Beam pipeline. You can either use the default Kafka topic name or 
+specify a Kafka topic name. Following command assumes Dataflow. See 
+[here](https://beam.apache.org/get-started/quickstart-py/) for instructions on 
+running Beam Python programs on other runners.
+
+Note that this exemple is not available in Beam versions before 2.24.0 hence
+you'll have to either get the example program from Beam or follow steps
+provided in the section *Running the Example from a Beam Git Clone*.
+
+```sh
+export PROJECT="$(gcloud config get-value project)"
+export TEMP_LOCATION="gs://MY-BUCKET/temp"
+export REGION="us-central1" 
+export JOB_NAME="kafka-taxi-`date +%Y%m%d-%H%M%S`"
+export NUM_WORKERS="5"
+
+python -m apache_beam.examples.kafkataxi.kafka_taxi \
+  --runner DataflowRunner \
+  --temp_location $TEMP_LOCATION \
+  --project $PROJECT \
+  --region $REGION \
+  --num_workers $NUM_WORKERS \
+  --job_name $JOB_NAME \
+  --bootstrap_servers $BOOTSTRAP_SERVER \
+  --experiments=use_runner_v2
+```
+
+## *(Optional)*  Running the Example from a Beam Git Clone
+
+Running this example from a Beam Git clone requires some additional steps.
+
+Checkout a clone of the Beam Git repo. See 
+[here](https://beam.apache.org/contribute/) for prerequisites.
+
+Assume your Github username to be `GITHUB_USERNAME`.
+
+```sh
+git clone git@github.com:${GITHUB_USERNAME}/beam
+cd beam
+```
+
+Build IO expansion service jar.
+
+```sh
+./gradlew :sdks:java:io:expansion-service:build
+```
+
+Push a java SDK Harness container to [Docker](https://www.docker.com/get-started)
+Hub. See 
+[here](https://beam.apache.org/documentation/runtime/environments/) for 
+prerequisites and additional information.
+
+```sh
+export DOCKER_ROOT="Your Docker Repository Root"
+./gradlew :sdks:java:container:docker -Pdocker-repository-root=$DOCKER_ROOT -Pdocker-tag=latest
+docker push $DOCKER_ROOT/beam_java_sdk:latest
+```
+
+For portable Flink/Spark in local mode, instead of above command just build the
+Java SDK harness container locally using the default values for repository root
+and the docker tag.
+
+Activate your Python virtual environment.  This example uses `venv`. See 
+[here](https://cwiki.apache.org/confluence/display/BEAM/Python+Tips) for
+instructions regarding setting up other types of Python virtual environments.
+
+```sh
+cd ..  # Creating the virtual environment in the top level work directory.
+python -m venv env
+source env/bin/activate
+```
+
+Install Beam and dependencies and build a Beam distribution.
+
+```sh
+cd beam/sdks/python
+pip install -r build-requirements.txt
+pip install -e '.[gcp]'
+python setup.py sdist
+```
+
+Run the Beam pipeline. You can either use the default Kafka topic name or specify 
+a Kafka topic name. Following command assumes Dataflow. See 
+[here](https://beam.apache.org/get-started/quickstart-py/) for instructions on 
+running Beam Python programs on other runners.
+
+See [here](https://cloud.google.com/dataflow/docs/concepts/regional-endpoints) 
+for more details regarding the selecting a GCP region for Dataflow.
+
+```sh
+export PROJECT="$(gcloud config get-value project)"
+export TEMP_LOCATION="gs://MY-BUCKET/temp"
+export REGION="us-central1" 
+export JOB_NAME="kafka-taxi-`date +%Y%m%d-%H%M%S`"
+export export NUM_WORKERS="5"

Review comment:
       Done.

##########
File path: sdks/python/apache_beam/examples/kafkataxi/README.md
##########
@@ -0,0 +1,190 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+
+# Python KafkaIO Example
+
+This example reads from the Google Cloud Pub/Sub NYC Taxi stream described
+[here](https://github.com/googlecodelabs/cloud-dataflow-nyc-taxi-tycoon), writes
+to a given Kafka topic, and reads back from the same Kafka topic. This example
+uses cross-language transforms available in
+[kafka.py](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/kafka.py).
+Transforms are implemented in Java and are available
+[here](https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java).
+
+## Prerequisites
+
+Install [Java Development kit (JDK) version 8](https://www.oracle.com/java/technologies/javase-downloads.html)
+in your system and make sure that `JAVA_HOME` environment variable points to
+your JDK installation. Make sure that `java` command is available in 
+the environment.
+
+```sh
+java --version
+<Should print information regarding the installed Java version>
+```
+
+## Setup the Kafka cluster
+
+This example requires users to setup a Kafka cluster that the Beam runner
+executing the pipeline has access to.
+
+See [here]((https://kafka.apache.org/quickstart)) for general instructions on
+setting up a Kafka cluster. One option is to setup the Kafka cluster in
+[GCE](https://cloud.google.com/compute). See
+[here](https://github.com/GoogleCloudPlatform/java-docs-samples/tree/master/dataflow/flex-templates/kafka_to_bigquery)
+for step by step instructions on  setting up a single node Kafka cluster in GCE.
+When using Dataflow consider starting the Kafka cluster in the region where
+Dataflow pipeline will be running. See
+[here](https://cloud.google.com/dataflow/docs/concepts/regional-endpoints) 
+for more details regarding the selecting a GCP region for Dataflow.
+
+Let's assume that that IP address of one of the [bootstrap servers](https://kafka.apache.org/quickstart)
+of the Kafka cluster to be  `123.45.67.89:123` and the port to be `9092`.
+
+```sh
+export BOOTSTRAP_SERVER="123.45.67.89:123:9092"
+```
+
+## Running the example on latest released Beam version
+
+Perform Beam runner specific setup. Note that cross-language transforms require 
+portable implementations of Spark/Flink/Direct runners. Dataflow requires
+[runner V2](https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#dataflow-runner-v2).
+See [here](https://beam.apache.org/documentation/runners/dataflow/) for 
+instructions for setting up Dataflow.
+
+Setup a virtual environment for running Beam Python programs. See 
+[here](https://beam.apache.org/get-started/quickstart-py/) for prerequisites. 
+Dataflow requires the `gcp` tag when installing Beam.
+
+```sh
+python -m venv env
+source env/bin/activate
+pip install -e 'apache-beam[gcp]'
+```
+
+Run the Beam pipeline. You can either use the default Kafka topic name or 
+specify a Kafka topic name. Following command assumes Dataflow. See 
+[here](https://beam.apache.org/get-started/quickstart-py/) for instructions on 
+running Beam Python programs on other runners.
+
+Note that this exemple is not available in Beam versions before 2.24.0 hence

Review comment:
       Done.

##########
File path: sdks/python/apache_beam/examples/kafkataxi/kafka_taxi.py
##########
@@ -0,0 +1,105 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""An example that writes to and reads from Kafka.
+
+ This example reads from the PubSub NYC Taxi stream described in
+ https://github.com/googlecodelabs/cloud-dataflow-nyc-taxi-tycoon, writes to a
+ given Kafka topic and reads back from the same Kafka topic.
+ """
+
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import logging
+import typing
+
+import apache_beam as beam
+from apache_beam.io.kafka import ReadFromKafka
+from apache_beam.io.kafka import WriteToKafka
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import SetupOptions
+from apache_beam.options.pipeline_options import StandardOptions
+
+
+def run(bootstrap_servers, topic, pipeline_args):
+  # bootstrap_servers = '...'
+  # topic = 'kafka_taxirides_realtime'
+  # pipeline_args = ['--project', 'my-project', '--runner', 'DataflowRunner',
+  #                  '--temp_location', 'my-temp-location',
+  #                  '--region', 'my-region', '--num_workers',
+  #                  'my-num-workers', '--experiments', 'use_runner_v2']
+
+  pipeline_options = PipelineOptions(pipeline_args)

Review comment:
       Done.

##########
File path: sdks/python/apache_beam/examples/kafkataxi/README.md
##########
@@ -0,0 +1,190 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+
+# Python KafkaIO Example
+
+This example reads from the Google Cloud Pub/Sub NYC Taxi stream described
+[here](https://github.com/googlecodelabs/cloud-dataflow-nyc-taxi-tycoon), writes
+to a given Kafka topic, and reads back from the same Kafka topic. This example
+uses cross-language transforms available in
+[kafka.py](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/kafka.py).
+Transforms are implemented in Java and are available
+[here](https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java).
+
+## Prerequisites
+
+Install [Java Development kit (JDK) version 8](https://www.oracle.com/java/technologies/javase-downloads.html)

Review comment:
       I'm using the same link as Beam quickstart: https://beam.apache.org/get-started/quickstart-java/

##########
File path: sdks/python/apache_beam/examples/kafkataxi/kafka_taxi.py
##########
@@ -0,0 +1,105 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""An example that writes to and reads from Kafka.
+
+ This example reads from the PubSub NYC Taxi stream described in
+ https://github.com/googlecodelabs/cloud-dataflow-nyc-taxi-tycoon, writes to a
+ given Kafka topic and reads back from the same Kafka topic.
+ """
+
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import logging
+import typing
+
+import apache_beam as beam
+from apache_beam.io.kafka import ReadFromKafka
+from apache_beam.io.kafka import WriteToKafka
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import SetupOptions
+from apache_beam.options.pipeline_options import StandardOptions
+
+
+def run(bootstrap_servers, topic, pipeline_args):
+  # bootstrap_servers = '...'
+  # topic = 'kafka_taxirides_realtime'
+  # pipeline_args = ['--project', 'my-project', '--runner', 'DataflowRunner',
+  #                  '--temp_location', 'my-temp-location',
+  #                  '--region', 'my-region', '--num_workers',
+  #                  'my-num-workers', '--experiments', 'use_runner_v2']
+
+  pipeline_options = PipelineOptions(pipeline_args)
+  pipeline_options.view_as(SetupOptions).save_main_session = True
+  pipeline_options.view_as(StandardOptions).streaming = True
+  window_size = 15  # size of the Window in seconds.
+
+  def log_ride(ride_bytes):
+    # Converting bytes record from Kafka to a dictionary.
+    import ast
+    ride = ast.literal_eval(ride_bytes.decode("UTF-8"))
+    logging.info(
+        'Found ride at latitude %r and longitude %r with %r '
+        'passengers',
+        ride['latitude'],
+        ride['longitude'],
+        ride['passenger_count'])
+
+  with beam.Pipeline(options=pipeline_options) as pipeline:
+    _ = (
+        pipeline
+        | beam.io.ReadFromPubSub(
+            topic='projects/pubsub-public-data/topics/taxirides-realtime').
+        with_output_types(bytes)
+        | beam.Map(lambda x: (b'', x)).with_output_types(
+            typing.Tuple[bytes, bytes])  # Kafka write transforms expects KVs.
+        | beam.WindowInto(beam.window.FixedWindows(window_size))
+        | WriteToKafka(
+            producer_config={'bootstrap.servers': bootstrap_servers},
+            topic=topic))
+
+    _ = (
+        pipeline
+        | ReadFromKafka(
+            consumer_config={'bootstrap.servers': bootstrap_servers},
+            topics=[topic])
+        | beam.FlatMap(lambda kv: log_ride(kv[1])))
+
+    pipeline.run().wait_until_finish()

Review comment:
       Done.

##########
File path: sdks/python/apache_beam/examples/kafkataxi/kafka_taxi.py
##########
@@ -0,0 +1,87 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""An example that writes to and reads from Kafka.
+
+ This example reads from the PubSub NYC Taxi stream described in
+ https://github.com/googlecodelabs/cloud-dataflow-nyc-taxi-tycoon, writes to a
+ given Kafka topic and reads back from the same Kafka topic.
+ """
+
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import argparse
+import logging
+import typing
+
+import apache_beam as beam
+from apache_beam.io.kafka import ReadFromKafka
+from apache_beam.io.kafka import WriteToKafka
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import SetupOptions
+from apache_beam.options.pipeline_options import StandardOptions
+
+
+def run(argv=None):
+  """Main entry point; defines and runs the wordcount pipeline."""
+  parser = argparse.ArgumentParser()
+  parser.add_argument(
+      '--bootstrap_servers',
+      dest='bootstrap_servers',
+      required=True,
+      help='Bootstrap servers for the Kafka cluster. Should be accessible by '
+      'the runner')
+  parser.add_argument(
+      '--topic',
+      dest='topic',
+      default='kafka_taxirides_realtime',
+      help='Kafka topic to write to and read from')
+  known_args, pipeline_args = parser.parse_known_args(argv)
+
+  pipeline_options = PipelineOptions(pipeline_args)

Review comment:
       Done.

##########
File path: sdks/python/apache_beam/examples/kafkataxi/kafka_taxi.py
##########
@@ -0,0 +1,105 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""An example that writes to and reads from Kafka.
+
+ This example reads from the PubSub NYC Taxi stream described in
+ https://github.com/googlecodelabs/cloud-dataflow-nyc-taxi-tycoon, writes to a
+ given Kafka topic and reads back from the same Kafka topic.
+ """
+
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import logging
+import typing
+
+import apache_beam as beam
+from apache_beam.io.kafka import ReadFromKafka
+from apache_beam.io.kafka import WriteToKafka
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import SetupOptions
+from apache_beam.options.pipeline_options import StandardOptions
+
+
+def run(bootstrap_servers, topic, pipeline_args):
+  # bootstrap_servers = '...'
+  # topic = 'kafka_taxirides_realtime'
+  # pipeline_args = ['--project', 'my-project', '--runner', 'DataflowRunner',

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] chamikaramj commented on pull request #12188: [BEAM-10411] Adds an example that use Python cross-language Kafka transforms.

Posted by GitBox <gi...@apache.org>.
chamikaramj commented on pull request #12188:
URL: https://github.com/apache/beam/pull/12188#issuecomment-661236298


   Run Python PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] davidcavazos commented on a change in pull request #12188: [BEAM-10411] Adds an example that use Python cross-language Kafka transforms.

Posted by GitBox <gi...@apache.org>.
davidcavazos commented on a change in pull request #12188:
URL: https://github.com/apache/beam/pull/12188#discussion_r457723919



##########
File path: sdks/python/apache_beam/examples/kafkataxi/kafka_taxi.py
##########
@@ -0,0 +1,87 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""An example that writes to and reads from Kafka.
+
+ This example reads from the PubSub NYC Taxi stream described in
+ https://github.com/googlecodelabs/cloud-dataflow-nyc-taxi-tycoon, writes to a
+ given Kafka topic and reads back from the same Kafka topic.
+ """
+
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import argparse
+import logging
+import typing
+
+import apache_beam as beam
+from apache_beam.io.kafka import ReadFromKafka
+from apache_beam.io.kafka import WriteToKafka
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import SetupOptions
+from apache_beam.options.pipeline_options import StandardOptions
+
+
+def run(argv=None):
+  """Main entry point; defines and runs the wordcount pipeline."""
+  parser = argparse.ArgumentParser()
+  parser.add_argument(
+      '--bootstrap_servers',
+      dest='bootstrap_servers',
+      required=True,
+      help='Bootstrap servers for the Kafka cluster. Should be accessible by '
+      'the runner')
+  parser.add_argument(
+      '--topic',
+      dest='topic',
+      default='kafka_taxirides_realtime',
+      help='Kafka topic to write to and read from')
+  known_args, pipeline_args = parser.parse_known_args(argv)
+
+  pipeline_options = PipelineOptions(pipeline_args)

Review comment:
       They're not hardcoded, they are "inferred" from the `**kwargs` passed to `PipelineOptions`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org