You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ch...@apache.org on 2020/07/21 21:00:12 UTC
[beam] branch master updated: [BEAM-10411] Adds an example that use
Python cross-language Kafka transforms. (#12188)
This is an automated email from the ASF dual-hosted git repository.
chamikara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 6761a5b [BEAM-10411] Adds an example that use Python cross-language Kafka transforms. (#12188)
6761a5b is described below
commit 6761a5bff9e2d28dad056f3fd4fa381c828238e1
Author: Chamikara Jayalath <ch...@apache.org>
AuthorDate: Tue Jul 21 13:59:44 2020 -0700
[BEAM-10411] Adds an example that use Python cross-language Kafka transforms. (#12188)
* Adds an example that use Python cross-language Kafka transforms.
* Address reviewer comments
* Addresses reviewer comments
---
.../apache_beam/examples/kafkataxi/README.md | 189 +++++++++++++++++++++
.../apache_beam/examples/kafkataxi/__init__.py | 18 ++
.../apache_beam/examples/kafkataxi/kafka_taxi.py | 102 +++++++++++
3 files changed, 309 insertions(+)
diff --git a/sdks/python/apache_beam/examples/kafkataxi/README.md b/sdks/python/apache_beam/examples/kafkataxi/README.md
new file mode 100644
index 0000000..1af7eb8
--- /dev/null
+++ b/sdks/python/apache_beam/examples/kafkataxi/README.md
@@ -0,0 +1,189 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+# Python KafkaIO Example
+
+This example reads from the Google Cloud Pub/Sub NYC Taxi stream described
+[here](https://github.com/googlecodelabs/cloud-dataflow-nyc-taxi-tycoon), writes
+to a given Kafka topic, and reads back from the same Kafka topic. This example
+uses cross-language transforms available in
+[kafka.py](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/kafka.py).
+Transforms are implemented in Java and are available
+[here](https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java).
+
+## Prerequisites
+
+Install [Java Development kit (JDK) version 8](https://www.oracle.com/java/technologies/javase-downloads.html)
+in your system and make sure that `JAVA_HOME` environment variable points to
+your JDK installation. Make sure that `java` command is available in
+the environment.
+
+```sh
+java --version
+<Should print information regarding the installed Java version>
+```
+
+## Setup the Kafka cluster
+
+This example requires users to setup a Kafka cluster that the Beam runner
+executing the pipeline has access to.
+
+See [here]((https://kafka.apache.org/quickstart)) for general instructions on
+setting up a Kafka cluster. One option is to setup the Kafka cluster in
+[GCE](https://cloud.google.com/compute). See
+[here](https://github.com/GoogleCloudPlatform/java-docs-samples/tree/master/dataflow/flex-templates/kafka_to_bigquery)
+for step by step instructions on setting up a single node Kafka cluster in GCE.
+When using Dataflow consider starting the Kafka cluster in the region where
+Dataflow pipeline will be running. See
+[here](https://cloud.google.com/dataflow/docs/concepts/regional-endpoints)
+for more details regarding the selecting a GCP region for Dataflow.
+
+Let's assume that that IP address of one of the [bootstrap servers](https://kafka.apache.org/quickstart)
+of the Kafka cluster to be `123.45.67.89:123` and the port to be `9092`.
+
+```sh
+export BOOTSTRAP_SERVER="123.45.67.89:123:9092"
+```
+
+## Running the example on latest released Beam version
+
+Perform Beam runner specific setup.
+
+ℹ️ Note that cross-language transforms require
+portable implementations of Spark/Flink/Direct runners. Dataflow requires
+[runner V2](https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#dataflow-runner-v2).
+See [here](https://beam.apache.org/documentation/runners/dataflow/) for
+instructions for setting up Dataflow.
+
+Setup a virtual environment for running Beam Python programs. See
+[here](https://beam.apache.org/get-started/quickstart-py/) for prerequisites.
+Dataflow requires the `gcp` tag when installing Beam.
+
+```sh
+python -m venv env
+source env/bin/activate
+pip install 'apache-beam[gcp]'
+```
+
+Run the Beam pipeline. You can either use the default Kafka topic name or
+specify a Kafka topic name. Following command assumes Dataflow. See
+[here](https://beam.apache.org/get-started/quickstart-py/) for instructions on
+running Beam Python programs on other runners.
+
+ℹ️ Note that this exemple is not available in Beam versions before 2.24.0 hence
+you'll have to either get the example program from Beam or follow steps
+provided in the section *Running the Example from a Beam Git Clone*.
+
+```sh
+export PROJECT="$(gcloud config get-value project)"
+export TEMP_LOCATION="gs://MY-BUCKET/temp"
+export REGION="us-central1"
+export JOB_NAME="kafka-taxi-`date +%Y%m%d-%H%M%S`"
+export NUM_WORKERS="5"
+
+python -m apache_beam.examples.kafkataxi.kafka_taxi \
+ --runner DataflowRunner \
+ --temp_location $TEMP_LOCATION \
+ --project $PROJECT \
+ --region $REGION \
+ --num_workers $NUM_WORKERS \
+ --job_name $JOB_NAME \
+ --bootstrap_servers $BOOTSTRAP_SERVER \
+ --experiments=use_runner_v2
+```
+
+## *(Optional)* Running the Example from a Beam Git Clone
+
+Running this example from a Beam Git clone requires some additional steps.
+
+Checkout a clone of the Beam Git repo. See
+[here](https://beam.apache.org/contribute/) for prerequisites.
+
+Assume your Github username to be `GITHUB_USERNAME`.
+
+```sh
+git clone git@github.com:${GITHUB_USERNAME}/beam
+cd beam
+```
+
+Build IO expansion service jar.
+
+```sh
+./gradlew :sdks:java:io:expansion-service:build
+```
+
+Push a java SDK Harness container to [Docker](https://www.docker.com/get-started)
+Hub. See
+[here](https://beam.apache.org/documentation/runtime/environments/) for
+prerequisites and additional information.
+
+```sh
+export DOCKER_ROOT="Your Docker Repository Root"
+./gradlew :sdks:java:container:docker -Pdocker-repository-root=$DOCKER_ROOT -Pdocker-tag=latest
+docker push $DOCKER_ROOT/beam_java_sdk:latest
+```
+
+For portable Flink/Spark in local mode, instead of above command just build the
+Java SDK harness container locally using the default values for repository root
+and the docker tag.
+
+Activate your Python virtual environment. This example uses `venv`. See
+[here](https://cwiki.apache.org/confluence/display/BEAM/Python+Tips) for
+instructions regarding setting up other types of Python virtual environments.
+
+```sh
+cd .. # Creating the virtual environment in the top level work directory.
+python -m venv env
+source env/bin/activate
+```
+
+Install Beam and dependencies and build a Beam distribution.
+
+```sh
+cd beam/sdks/python
+pip install -r build-requirements.txt
+pip install -e '.[gcp]'
+python setup.py sdist
+```
+
+Run the Beam pipeline. You can either use the default Kafka topic name or specify
+a Kafka topic name. Following command assumes Dataflow. See
+[here](https://beam.apache.org/get-started/quickstart-py/) for instructions on
+running Beam Python programs on other runners.
+
+```sh
+export PROJECT="$(gcloud config get-value project)"
+export TEMP_LOCATION="gs://MY-BUCKET/temp"
+export REGION="us-central1"
+export JOB_NAME="kafka-taxi-`date +%Y%m%d-%H%M%S`"
+export NUM_WORKERS="5"
+export PYTHON_DISTRIBUTION="dist/'Name of Python distribution'"
+
+python -m apache_beam.examples.kafkataxi.kafka_taxi \
+ --runner DataflowRunner \
+ --temp_location $TEMP_LOCATION \
+ --project $PROJECT \
+ --region $REGION \
+ --sdk_location $PYTHON_DISTRIBUTION \
+ --num_workers $NUM_WORKERS \
+ --job_name $JOB_NAME \
+ --bootstrap_servers $BOOTSTRAP_SERVER \
+ --sdk_harness_container_image_overrides ".*java.*,${DOCKER_ROOT}/beam_java_sdk:latest" \
+ --experiments=use_runner_v2
+```
diff --git a/sdks/python/apache_beam/examples/kafkataxi/__init__.py b/sdks/python/apache_beam/examples/kafkataxi/__init__.py
new file mode 100644
index 0000000..6569e3f
--- /dev/null
+++ b/sdks/python/apache_beam/examples/kafkataxi/__init__.py
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import absolute_import
diff --git a/sdks/python/apache_beam/examples/kafkataxi/kafka_taxi.py b/sdks/python/apache_beam/examples/kafkataxi/kafka_taxi.py
new file mode 100644
index 0000000..f1fc646
--- /dev/null
+++ b/sdks/python/apache_beam/examples/kafkataxi/kafka_taxi.py
@@ -0,0 +1,102 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""An example that writes to and reads from Kafka.
+
+ This example reads from the PubSub NYC Taxi stream described in
+ https://github.com/googlecodelabs/cloud-dataflow-nyc-taxi-tycoon, writes to a
+ given Kafka topic and reads back from the same Kafka topic.
+ """
+
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import logging
+import typing
+
+import apache_beam as beam
+from apache_beam.io.kafka import ReadFromKafka
+from apache_beam.io.kafka import WriteToKafka
+from apache_beam.options.pipeline_options import PipelineOptions
+
+
+def run(bootstrap_servers, topic, pipeline_args):
+ # bootstrap_servers = '123.45.67.89:123:9092'
+ # topic = 'kafka_taxirides_realtime'
+ # pipeline_args = ['--project', 'my-project',
+ # '--runner', 'DataflowRunner',
+ # '--temp_location', 'my-temp-location',
+ # '--region', 'my-region',
+ # '--num_workers', 'my-num-workers',
+ # '--experiments', 'use_runner_v2']
+
+ pipeline_options = PipelineOptions(
+ pipeline_args, save_main_session=True, streaming=True)
+ window_size = 15 # size of the Window in seconds.
+
+ def log_ride(ride_bytes):
+ # Converting bytes record from Kafka to a dictionary.
+ import ast
+ ride = ast.literal_eval(ride_bytes.decode("UTF-8"))
+ logging.info(
+ 'Found ride at latitude %r and longitude %r with %r '
+ 'passengers',
+ ride['latitude'],
+ ride['longitude'],
+ ride['passenger_count'])
+
+ with beam.Pipeline(options=pipeline_options) as pipeline:
+ _ = (
+ pipeline
+ | beam.io.ReadFromPubSub(
+ topic='projects/pubsub-public-data/topics/taxirides-realtime').
+ with_output_types(bytes)
+ | beam.Map(lambda x: (b'', x)).with_output_types(
+ typing.Tuple[bytes, bytes]) # Kafka write transforms expects KVs.
+ | beam.WindowInto(beam.window.FixedWindows(window_size))
+ | WriteToKafka(
+ producer_config={'bootstrap.servers': bootstrap_servers},
+ topic=topic))
+
+ _ = (
+ pipeline
+ | ReadFromKafka(
+ consumer_config={'bootstrap.servers': bootstrap_servers},
+ topics=[topic])
+ | beam.FlatMap(lambda kv: log_ride(kv[1])))
+
+
+if __name__ == '__main__':
+ logging.getLogger().setLevel(logging.INFO)
+ import argparse
+
+ parser = argparse.ArgumentParser()
+ parser.add_argument(
+ '--bootstrap_servers',
+ dest='bootstrap_servers',
+ required=True,
+ help='Bootstrap servers for the Kafka cluster. Should be accessible by '
+ 'the runner')
+ parser.add_argument(
+ '--topic',
+ dest='topic',
+ default='kafka_taxirides_realtime',
+ help='Kafka topic to write to and read from')
+ known_args, pipeline_args = parser.parse_known_args()
+
+ run(known_args.bootstrap_servers, known_args.topic, pipeline_args)